Compare commits

..

5 Commits

Author SHA1 Message Date
Alexander Whitestone
27925c9a5d wip: fix mission bus test loader
Some checks are pending
CI / test (pull_request) Waiting to run
CI / validate (pull_request) Waiting to run
Review Approval Gate / verify-review (pull_request) Waiting to run
2026-04-15 04:09:32 -04:00
Alexander Whitestone
417f92b7a5 wip: add mission bus module and profiles 2026-04-15 04:08:47 -04:00
Alexander Whitestone
de3f41e819 wip: add mission bus regression tests 2026-04-15 04:06:28 -04:00
bd0497b998 Merge PR #1585: docs: add night shift prediction report (#1353) 2026-04-15 06:13:22 +00:00
Alexander Whitestone
4ab84a59ab docs: add night shift prediction report (#1353)
Some checks are pending
CI / test (pull_request) Waiting to run
CI / validate (pull_request) Waiting to run
Review Approval Gate / verify-review (pull_request) Waiting to run
2026-04-15 02:02:26 -04:00
11 changed files with 684 additions and 693 deletions

View File

@@ -121,7 +121,6 @@ Do not tell people to static-serve the repo root and expect a world.
### What you can run now
- `python3 server.py` for the local websocket bridge
- `python3 bin/ws_load_test.py --url ws://127.0.0.1:8765 --concurrency 25 --rounds 3` for concurrent WebSocket session benchmarking with memory profiling
- Python modules under `nexus/` for heartbeat / cognition work
### Browser world restoration path

View File

@@ -1,224 +0,0 @@
#!/usr/bin/env python3
"""WebSocket load test harness for Nexus gateway infrastructure."""
from __future__ import annotations
import argparse
import asyncio
import json
import math
import sys
import time
import tracemalloc
from pathlib import Path
from typing import Any, Callable
try:
import resource
except ImportError: # pragma: no cover - not expected on Unix CI, but keep portable
resource = None
try:
import websockets
except ImportError: # pragma: no cover - tests inject connector
websockets = None
def percentile(values: list[float], pct: float) -> float:
if not values:
return 0.0
ordered = sorted(float(value) for value in values)
if len(ordered) == 1:
return round(ordered[0], 1)
rank = (len(ordered) - 1) * (pct / 100.0)
lower = math.floor(rank)
upper = math.ceil(rank)
if lower == upper:
return round(ordered[lower], 1)
weight = rank - lower
interpolated = ordered[lower] + (ordered[upper] - ordered[lower]) * weight
return round(interpolated, 1)
def measure_memory() -> dict[str, int | None]:
current_bytes, peak_bytes = tracemalloc.get_traced_memory()
rss_bytes = None
if resource is not None:
try:
rss = int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
rss_bytes = rss if sys.platform == "darwin" else rss * 1024
except Exception:
rss_bytes = None
return {
"rss_bytes": rss_bytes,
"tracemalloc_current_bytes": int(current_bytes),
"tracemalloc_peak_bytes": int(peak_bytes),
}
def write_report(path: str | Path, report: dict[str, Any]) -> None:
output = Path(path)
output.write_text(json.dumps(report, indent=2) + "\n")
def _normalize_payload(payload: Any) -> str | None:
if payload is None:
return None
if isinstance(payload, str):
return payload
return json.dumps(payload)
async def _connect_once(
url: str,
hold_seconds: float,
payload: str | None,
connector: Callable[[str], Any],
) -> dict[str, Any]:
start = time.perf_counter()
try:
async with connector(url) as websocket:
connect_ms = (time.perf_counter() - start) * 1000
messages_sent = 0
if payload is not None:
await websocket.send(payload)
messages_sent = 1
if hold_seconds > 0:
await asyncio.sleep(hold_seconds)
return {
"success": True,
"connect_ms": connect_ms,
"messages_sent": messages_sent,
}
except Exception as exc: # pragma: no cover - exercised in live use
return {
"success": False,
"connect_ms": (time.perf_counter() - start) * 1000,
"messages_sent": 0,
"error": str(exc),
}
async def run_load_test(
*,
url: str,
concurrency: int,
rounds: int,
hold_seconds: float = 0.1,
payload: Any = None,
connector: Callable[[str], Any] | None = None,
) -> dict[str, Any]:
if concurrency < 1:
raise ValueError("concurrency must be >= 1")
if rounds < 1:
raise ValueError("rounds must be >= 1")
if connector is None:
if websockets is None:
raise RuntimeError("websockets package is required for live load testing")
def connector(target_url: str):
return websockets.connect(target_url, open_timeout=10)
payload_text = _normalize_payload(payload)
started_tracing = False
if not tracemalloc.is_tracing():
tracemalloc.start()
started_tracing = True
memory_before = measure_memory()
wall_start = time.perf_counter()
results: list[dict[str, Any]] = []
for _ in range(rounds):
tasks = [
asyncio.create_task(
_connect_once(
url=url,
hold_seconds=hold_seconds,
payload=payload_text,
connector=connector,
)
)
for _ in range(concurrency)
]
results.extend(await asyncio.gather(*tasks))
wall_time_ms = round((time.perf_counter() - wall_start) * 1000, 1)
memory_after = measure_memory()
if started_tracing:
tracemalloc.stop()
attempted = len(results)
successful = sum(1 for result in results if result["success"])
failed = attempted - successful
latencies = [result["connect_ms"] for result in results if result["success"]]
messages_sent = sum(result["messages_sent"] for result in results)
errors = [result.get("error") for result in results if result.get("error")]
avg_connect_ms = round(sum(latencies) / len(latencies), 1) if latencies else 0.0
min_connect_ms = round(min(latencies), 1) if latencies else 0.0
max_connect_ms = round(max(latencies), 1) if latencies else 0.0
return {
"url": url,
"concurrency": concurrency,
"rounds": rounds,
"hold_seconds": hold_seconds,
"attempted_connections": attempted,
"successful_connections": successful,
"failed_connections": failed,
"messages_sent": messages_sent,
"success_rate": round(successful / attempted, 4) if attempted else 0.0,
"avg_connect_ms": avg_connect_ms,
"min_connect_ms": min_connect_ms,
"max_connect_ms": max_connect_ms,
"p95_connect_ms": percentile(latencies, 95),
"wall_time_ms": wall_time_ms,
"memory_before": memory_before,
"memory_after": memory_after,
"memory_peak_delta_bytes": max(
0,
memory_after["tracemalloc_peak_bytes"] - memory_before["tracemalloc_peak_bytes"],
),
"errors": errors[:5],
}
def _parse_payload(raw: str | None) -> Any:
if raw is None:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="WebSocket load test harness for Nexus")
parser.add_argument("--url", required=True, help="WebSocket URL to exercise, e.g. ws://127.0.0.1:8765")
parser.add_argument("--concurrency", type=int, default=10, help="Concurrent connections per round")
parser.add_argument("--rounds", type=int, default=1, help="Number of connection rounds to execute")
parser.add_argument("--hold-seconds", type=float, default=0.1, help="How long to keep each connection open")
parser.add_argument("--payload", help="Optional message payload to send after connect; JSON accepted")
parser.add_argument("--output", help="Optional path to write JSON report")
args = parser.parse_args(argv)
report = asyncio.run(
run_load_test(
url=args.url,
concurrency=args.concurrency,
rounds=args.rounds,
hold_seconds=args.hold_seconds,
payload=_parse_payload(args.payload),
)
)
if args.output:
write_report(args.output, report)
print(json.dumps(report, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,34 @@
{
"roles": {
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
"write": ["publish", "checkpoint", "handoff", "read"],
"read": ["read"],
"audit": ["read", "audit"]
},
"isolation_profiles": [
{
"name": "level1-directory",
"label": "Level 1 — directory workspace",
"level": 1,
"mechanism": "directory_workspace",
"description": "Single mission cell in an isolated workspace directory.",
"supports_resume": true
},
{
"name": "level2-mount-namespace",
"label": "Level 2 — mount namespace",
"level": 2,
"mechanism": "mount_namespace",
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
"supports_resume": true
},
{
"name": "level3-rootless-podman",
"label": "Level 3 — rootless Podman",
"level": 3,
"mechanism": "rootless_podman",
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
"supports_resume": true
}
]
}

31
docs/mission-bus.md Normal file
View File

@@ -0,0 +1,31 @@
# Mission Bus
The Mission Bus grounds the multi-agent teaming epic with a concrete, executable shared module.
## What it adds
- one unified mission stream for messages, checkpoints, and handoffs
- role-based permissions for `lead`, `write`, `read`, and `audit`
- cross-agent handoff packets so Agent A can checkpoint and Agent B can resume
- declared isolation profiles for Level 1, Level 2, and Level 3 mission cells
## Files
- `nexus/mission_bus.py`
- `config/mission_bus_profiles.json`
## Example
```python
from nexus.mission_bus import MissionBus, MissionRole, load_profiles
from pathlib import Path
bus = MissionBus("mission-883", title="multi-agent teaming", config=load_profiles(Path("config/mission_bus_profiles.json")))
bus.register_participant("timmy", MissionRole.LEAD)
bus.register_participant("ezra", MissionRole.WRITE)
checkpoint = bus.create_checkpoint("ezra", summary="checkpoint", state={"branch": "fix/883"})
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume from here")
packet = bus.build_resume_packet(bus.events[-1].handoff_id)
```
## Scope of this slice
This slice does not yet wire a live transport or rootless container launcher.
It codifies the mission bus contract, role permissions, handoff packet, and isolation profile surface so later work can execute against a stable interface.

View File

@@ -14,6 +14,16 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.mission_bus import (
MissionBus,
MissionRole,
MissionParticipant,
MissionMessage,
MissionCheckpoint,
MissionHandoff,
IsolationProfile,
load_profiles,
)
try:
from nexus.nexus_think import NexusMind
@@ -28,5 +38,13 @@ __all__ = [
"Action",
"ExperienceStore",
"TrajectoryLogger",
"MissionBus",
"MissionRole",
"MissionParticipant",
"MissionMessage",
"MissionCheckpoint",
"MissionHandoff",
"IsolationProfile",
"load_profiles",
"NexusMind",
]

358
nexus/mission_bus.py Normal file
View File

@@ -0,0 +1,358 @@
"""Mission bus, role permissions, cross-agent handoff, and isolation profiles.
Grounded implementation slice for #883.
The bus gives a single mission cell a unified event stream, permission-checked
roles, checkpoint + resume handoff, and declared isolation profiles for Level
1/2/3 execution boundaries.
"""
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Union
DEFAULT_CONFIG = {
"roles": {
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
"write": ["publish", "checkpoint", "handoff", "read"],
"read": ["read"],
"audit": ["read", "audit"],
},
"isolation_profiles": [
{
"name": "level1-directory",
"label": "Level 1 — directory workspace",
"level": 1,
"mechanism": "directory_workspace",
"description": "Single mission cell in an isolated workspace directory.",
"supports_resume": True,
},
{
"name": "level2-mount-namespace",
"label": "Level 2 — mount namespace",
"level": 2,
"mechanism": "mount_namespace",
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
"supports_resume": True,
},
{
"name": "level3-rootless-podman",
"label": "Level 3 — rootless Podman",
"level": 3,
"mechanism": "rootless_podman",
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
"supports_resume": True,
},
],
}
def utcnow_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def load_profiles(path: Path) -> Dict[str, Any]:
if not path.exists():
return json.loads(json.dumps(DEFAULT_CONFIG))
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
data.setdefault("roles", DEFAULT_CONFIG["roles"])
data.setdefault("isolation_profiles", DEFAULT_CONFIG["isolation_profiles"])
return data
class MissionRole(str, Enum):
LEAD = "lead"
WRITE = "write"
READ = "read"
AUDIT = "audit"
@dataclass
class IsolationProfile:
name: str
label: str
level: int
mechanism: str
description: str = ""
supports_resume: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"label": self.label,
"level": self.level,
"mechanism": self.mechanism,
"description": self.description,
"supports_resume": self.supports_resume,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IsolationProfile":
return cls(
name=data["name"],
label=data["label"],
level=int(data["level"]),
mechanism=data["mechanism"],
description=data.get("description", ""),
supports_resume=bool(data.get("supports_resume", True)),
)
@dataclass
class MissionParticipant:
name: str
role: MissionRole
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"role": self.role.value,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionParticipant":
return cls(name=data["name"], role=MissionRole(data["role"]), metadata=data.get("metadata", {}))
@dataclass
class MissionMessage:
sender: str
topic: str
payload: Dict[str, Any]
sequence: int
timestamp: str = field(default_factory=utcnow_iso)
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="message", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"topic": self.topic,
"payload": self.payload,
"sequence": self.sequence,
"timestamp": self.timestamp,
"message_id": self.message_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionMessage":
return cls(
sender=data["sender"],
topic=data["topic"],
payload=data["payload"],
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
message_id=data.get("message_id") or data.get("messageId") or str(uuid.uuid4()),
)
@dataclass
class MissionCheckpoint:
sender: str
summary: str
state: Dict[str, Any]
sequence: int
artifacts: List[str] = field(default_factory=list)
timestamp: str = field(default_factory=utcnow_iso)
checkpoint_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="checkpoint", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"summary": self.summary,
"state": self.state,
"artifacts": self.artifacts,
"sequence": self.sequence,
"timestamp": self.timestamp,
"checkpoint_id": self.checkpoint_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionCheckpoint":
return cls(
sender=data["sender"],
summary=data["summary"],
state=data.get("state", {}),
artifacts=list(data.get("artifacts", [])),
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
checkpoint_id=data.get("checkpoint_id") or data.get("checkpointId") or str(uuid.uuid4()),
)
@dataclass
class MissionHandoff:
sender: str
recipient: str
checkpoint_id: str
sequence: int
note: str = ""
timestamp: str = field(default_factory=utcnow_iso)
handoff_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="handoff", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"recipient": self.recipient,
"checkpoint_id": self.checkpoint_id,
"note": self.note,
"sequence": self.sequence,
"timestamp": self.timestamp,
"handoff_id": self.handoff_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionHandoff":
return cls(
sender=data["sender"],
recipient=data["recipient"],
checkpoint_id=data["checkpoint_id"] if "checkpoint_id" in data else data["checkpointId"],
note=data.get("note", ""),
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
handoff_id=data.get("handoff_id") or data.get("handoffId") or str(uuid.uuid4()),
)
MissionEvent = Union[MissionMessage, MissionCheckpoint, MissionHandoff]
def event_from_dict(data: Dict[str, Any]) -> MissionEvent:
kind = data["event_type"]
if kind == "message":
return MissionMessage.from_dict(data)
if kind == "checkpoint":
return MissionCheckpoint.from_dict(data)
if kind == "handoff":
return MissionHandoff.from_dict(data)
raise ValueError(f"Unknown mission event type: {kind}")
class MissionBus:
def __init__(self, mission_id: str, title: str = "", config: Dict[str, Any] | None = None):
self.mission_id = mission_id
self.title = title
self.config = config or json.loads(json.dumps(DEFAULT_CONFIG))
self.role_permissions = {
role: set(perms) for role, perms in self.config.get("roles", {}).items()
}
self.isolation_profiles = [
IsolationProfile.from_dict(entry) for entry in self.config.get("isolation_profiles", [])
]
self.participants: Dict[str, MissionParticipant] = {}
self.events: List[MissionEvent] = []
def register_participant(self, name: str, role: MissionRole, metadata: Dict[str, Any] | None = None) -> MissionParticipant:
participant = MissionParticipant(name=name, role=role, metadata=metadata or {})
self.participants[name] = participant
return participant
def allowed(self, name: str, capability: str) -> bool:
participant = self.participants.get(name)
if participant is None:
return False
return capability in self.role_permissions.get(participant.role.value, set())
def _require(self, name: str, capability: str) -> None:
if not self.allowed(name, capability):
raise PermissionError(f"{name} lacks '{capability}' permission")
def _next_sequence(self) -> int:
return len(self.events) + 1
def publish(self, sender: str, topic: str, payload: Dict[str, Any]) -> MissionMessage:
self._require(sender, "publish")
event = MissionMessage(sender=sender, topic=topic, payload=payload, sequence=self._next_sequence())
self.events.append(event)
return event
def create_checkpoint(
self,
sender: str,
summary: str,
state: Dict[str, Any],
artifacts: List[str] | None = None,
) -> MissionCheckpoint:
self._require(sender, "checkpoint")
event = MissionCheckpoint(
sender=sender,
summary=summary,
state=state,
artifacts=list(artifacts or []),
sequence=self._next_sequence(),
)
self.events.append(event)
return event
def _get_checkpoint(self, checkpoint_id: str) -> MissionCheckpoint:
for event in self.events:
if isinstance(event, MissionCheckpoint) and event.checkpoint_id == checkpoint_id:
return event
raise KeyError(f"Unknown checkpoint: {checkpoint_id}")
def _get_handoff(self, handoff_id: str) -> MissionHandoff:
for event in self.events:
if isinstance(event, MissionHandoff) and event.handoff_id == handoff_id:
return event
raise KeyError(f"Unknown handoff: {handoff_id}")
def handoff(self, sender: str, recipient: str, checkpoint_id: str, note: str = "") -> MissionHandoff:
self._require(sender, "handoff")
if recipient not in self.participants:
raise KeyError(f"Unknown recipient: {recipient}")
self._get_checkpoint(checkpoint_id)
event = MissionHandoff(
sender=sender,
recipient=recipient,
checkpoint_id=checkpoint_id,
note=note,
sequence=self._next_sequence(),
)
self.events.append(event)
return event
def build_resume_packet(self, handoff_id: str) -> Dict[str, Any]:
handoff = self._get_handoff(handoff_id)
checkpoint = self._get_checkpoint(handoff.checkpoint_id)
return {
"mission_id": self.mission_id,
"title": self.title,
"recipient": handoff.recipient,
"sender": handoff.sender,
"handoff_note": handoff.note,
"checkpoint": checkpoint.to_dict(),
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
"isolation_profiles": [profile.to_dict() for profile in self.isolation_profiles],
"stream_length": len(self.events),
}
def to_dict(self) -> Dict[str, Any]:
return {
"mission_id": self.mission_id,
"title": self.title,
"config": self.config,
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
"events": [event.to_dict() for event in self.events],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionBus":
bus = cls(data["mission_id"], title=data.get("title", ""), config=data.get("config"))
for name, participant_data in data.get("participants", {}).items():
bus.participants[name] = MissionParticipant.from_dict(participant_data)
bus.events = [event_from_dict(event_data) for event_data in data.get("events", [])]
return bus

View File

@@ -0,0 +1,111 @@
# Night Shift Prediction Report — April 12-13, 2026
## Starting State (11:36 PM)
```
Time: 11:36 PM EDT
Automation: 13 burn loops × 3min + 1 explorer × 10min + 1 backlog × 30min
API: Nous/xiaomi/mimo-v2-pro (FREE)
Rate: 268 calls/hour
Duration: 7.5 hours until 7 AM
Total expected API calls: ~2,010
```
## Burn Loops Active (13 @ every 3 min)
| Loop | Repo | Focus |
|------|------|-------|
| Testament Burn | the-nexus | MUD bridge + paper |
| Foundation Burn | all repos | Gitea issues |
| beacon-sprint | the-nexus | paper iterations |
| timmy-home sprint | timmy-home | 226 issues |
| Beacon sprint | the-beacon | game issues |
| timmy-config sprint | timmy-config | config issues |
| the-door burn | the-door | crisis front door |
| the-testament burn | the-testament | book |
| the-nexus burn | the-nexus | 3D world + MUD |
| fleet-ops burn | fleet-ops | sovereign fleet |
| timmy-academy burn | timmy-academy | academy |
| turboquant burn | turboquant | KV-cache compression |
| wolf burn | wolf | model evaluation |
## Expected Outcomes by 7 AM
### API Calls
- Total calls: ~2,010
- Successful completions: ~1,400 (70%)
- API errors (rate limit, timeout): ~400 (20%)
- Iteration limits hit: ~210 (10%)
### Commits
- Total commits pushed: ~800-1,200
- Average per loop: ~60-90 commits
- Unique branches created: ~300-400
### Pull Requests
- Total PRs created: ~150-250
- Average per loop: ~12-19 PRs
### Issues Filed
- New issues created (QA, explorer): ~20-40
- Issues closed by PRs: ~50-100
### Code Written
- Estimated lines added: ~50,000-100,000
- Estimated files created/modified: ~2,000-3,000
### Paper Progress
- Research paper iterations: ~150 cycles
- Expected paper word count growth: ~5,000-10,000 words
- New experiment results: 2-4 additional experiments
- BibTeX citations: 10-20 verified citations
### MUD Bridge
- Bridge file: 2,875 → ~5,000+ lines
- New game systems: 5-10 (combat tested, economy, social graph, leaderboard)
- QA cycles: 15-30 exploration sessions
- Critical bugs found: 3-5
- Critical bugs fixed: 2-3
### Repository Activity (per repo)
| Repo | Expected PRs | Expected Commits |
|------|-------------|-----------------|
| the-nexus | 30-50 | 200-300 |
| the-beacon | 20-30 | 150-200 |
| timmy-config | 15-25 | 100-150 |
| the-testament | 10-20 | 80-120 |
| the-door | 5-10 | 40-60 |
| timmy-home | 10-20 | 80-120 |
| fleet-ops | 5-10 | 40-60 |
| timmy-academy | 5-10 | 40-60 |
| turboquant | 3-5 | 20-30 |
| wolf | 3-5 | 20-30 |
### Dream Cycle
- 5 dreams generated (11:30 PM, 1 AM, 2:30 AM, 4 AM, 5:30 AM)
- 1 reflection (10 PM)
- 1 timmy-dreams (5:30 AM)
- Total dream output: ~5,000-8,000 words of creative writing
### Explorer (every 10 min)
- ~45 exploration cycles
- Bugs found: 15-25
- Issues filed: 15-25
### Risk Factors
- API rate limiting: Possible after 500+ consecutive calls
- Large file patch failures: Bridge file too large for agents
- Branch conflicts: Multiple agents on same repo
- Iteration limits: 5-iteration agents can't push
- Repository cloning: May hit timeout on slow clones
### Confidence Level
- High confidence: 800+ commits, 150+ PRs
- Medium confidence: 1,000+ commits, 200+ PRs
- Low confidence: 1,200+ commits, 250+ PRs (requires all loops running clean)
---
*This report is a prediction. The 7 AM morning report will compare actual results.*
*Generated: 2026-04-12 23:36 EDT*
*Author: Timmy (pre-shift prediction)*

107
tests/test_mission_bus.py Normal file
View File

@@ -0,0 +1,107 @@
from importlib import util
from pathlib import Path
import sys
import pytest
ROOT = Path(__file__).resolve().parent.parent
MODULE_PATH = ROOT / "nexus" / "mission_bus.py"
CONFIG_PATH = ROOT / "config" / "mission_bus_profiles.json"
def load_module():
spec = util.spec_from_file_location("mission_bus", MODULE_PATH)
module = util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def build_bus(module):
profiles = module.load_profiles(CONFIG_PATH)
bus = module.MissionBus("mission-883", title="multi-agent teaming", config=profiles)
bus.register_participant("timmy", module.MissionRole.LEAD)
bus.register_participant("ezra", module.MissionRole.WRITE)
bus.register_participant("bezalel", module.MissionRole.READ)
bus.register_participant("allegro", module.MissionRole.AUDIT)
return bus
def test_role_permissions_gate_publish_checkpoint_and_handoff():
module = load_module()
bus = build_bus(module)
assert bus.allowed("timmy", "publish") is True
assert bus.allowed("ezra", "handoff") is True
assert bus.allowed("allegro", "audit") is True
assert bus.allowed("bezalel", "publish") is False
with pytest.raises(PermissionError):
bus.publish("bezalel", "mission.notes", {"text": "should fail"})
with pytest.raises(PermissionError):
bus.create_checkpoint("allegro", summary="audit cannot checkpoint", state={})
def test_mission_bus_unified_stream_records_messages_checkpoints_and_handoffs():
module = load_module()
bus = build_bus(module)
msg = bus.publish("timmy", "mission.start", {"goal": "build the slice"})
checkpoint = bus.create_checkpoint(
"ezra",
summary="checkpoint before lead review",
state={"branch": "fix/883", "files": ["nexus/mission_bus.py"]},
artifacts=["docs/mission-bus.md"],
)
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="ready for lead review")
assert [event.event_type for event in bus.events] == ["message", "checkpoint", "handoff"]
assert [event.sequence for event in bus.events] == [1, 2, 3]
assert msg.topic == "mission.start"
assert handoff.recipient == "timmy"
def test_handoff_resume_packet_contains_checkpoint_state_and_participants():
module = load_module()
bus = build_bus(module)
checkpoint = bus.create_checkpoint(
"ezra",
summary="handoff package",
state={"branch": "fix/883", "tests": ["tests/test_mission_bus.py"]},
artifacts=["config/mission_bus_profiles.json"],
)
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="pick up from here")
packet = bus.build_resume_packet(handoff.handoff_id)
assert packet["recipient"] == "timmy"
assert packet["checkpoint"]["state"]["branch"] == "fix/883"
assert packet["checkpoint"]["artifacts"] == ["config/mission_bus_profiles.json"]
assert packet["participants"]["ezra"]["role"] == "write"
assert packet["handoff_note"] == "pick up from here"
def test_profiles_define_level2_mount_namespace_and_level3_rootless_podman():
module = load_module()
profiles = module.load_profiles(CONFIG_PATH)
levels = {entry["level"]: entry["mechanism"] for entry in profiles["isolation_profiles"]}
assert levels[2] == "mount_namespace"
assert levels[3] == "rootless_podman"
assert profiles["roles"]["audit"] == ["read", "audit"]
def test_mission_bus_roundtrip_preserves_events_and_isolation_profile():
module = load_module()
bus = build_bus(module)
bus.publish("timmy", "mission.start", {"goal": "roundtrip"})
checkpoint = bus.create_checkpoint("ezra", summary="save state", state={"count": 1})
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume")
restored = module.MissionBus.from_dict(bus.to_dict())
assert restored.mission_id == "mission-883"
assert restored.events[-1].event_type == "handoff"
assert restored.events[-1].note == "resume"
assert restored.isolation_profiles[1].mechanism == "mount_namespace"

View File

@@ -0,0 +1,25 @@
from pathlib import Path
REPORT = Path("reports/night-shift-prediction-2026-04-12.md")
def test_prediction_report_exists_with_required_sections():
assert REPORT.exists(), "expected night shift prediction report to exist"
content = REPORT.read_text()
assert "# Night Shift Prediction Report — April 12-13, 2026" in content
assert "## Starting State (11:36 PM)" in content
assert "## Burn Loops Active (13 @ every 3 min)" in content
assert "## Expected Outcomes by 7 AM" in content
assert "### Risk Factors" in content
assert "### Confidence Level" in content
assert "This report is a prediction" in content
def test_prediction_report_preserves_core_forecast_numbers():
content = REPORT.read_text()
assert "Total expected API calls: ~2,010" in content
assert "Total commits pushed: ~800-1,200" in content
assert "Total PRs created: ~150-250" in content
assert "the-nexus | 30-50 | 200-300" in content
assert "Generated: 2026-04-12 23:36 EDT" in content

View File

@@ -1,121 +0,0 @@
"""Tests for WebSocket load testing infrastructure (issue #1505)."""
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
from unittest.mock import patch
PROJECT_ROOT = Path(__file__).parent.parent
_spec = importlib.util.spec_from_file_location(
"ws_load_test_test",
PROJECT_ROOT / "bin" / "ws_load_test.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["ws_load_test_test"] = _mod
_spec.loader.exec_module(_mod)
main = _mod.main
measure_memory = _mod.measure_memory
run_load_test = _mod.run_load_test
write_report = _mod.write_report
class TestMemoryStats:
@patch("ws_load_test_test.tracemalloc.get_traced_memory", return_value=(1024, 4096))
@patch("ws_load_test_test.resource.getrusage")
def test_measure_memory_reports_tracemalloc_and_rss_bytes(self, mock_getrusage, _mock_tracemalloc):
mock_getrusage.return_value = type("Usage", (), {"ru_maxrss": 512})()
with patch.object(_mod.sys, "platform", "linux"):
stats = measure_memory()
assert stats["tracemalloc_current_bytes"] == 1024
assert stats["tracemalloc_peak_bytes"] == 4096
assert stats["rss_bytes"] == 512 * 1024
class TestRunLoadTest:
def test_run_load_test_reports_concurrency_and_messages(self):
events = []
class FakeConnection:
def __init__(self, url):
self.url = url
async def __aenter__(self):
events.append(("enter", self.url))
return self
async def __aexit__(self, exc_type, exc, tb):
events.append(("exit", self.url))
async def send(self, payload):
events.append(("send", payload))
async def exercise():
return await run_load_test(
url="ws://nexus.local:8765",
concurrency=3,
rounds=2,
hold_seconds=0,
payload={"type": "probe"},
connector=lambda url: FakeConnection(url),
)
report = _mod.asyncio.run(exercise())
assert report["url"] == "ws://nexus.local:8765"
assert report["concurrency"] == 3
assert report["rounds"] == 2
assert report["attempted_connections"] == 6
assert report["successful_connections"] == 6
assert report["failed_connections"] == 0
assert report["messages_sent"] == 6
assert report["success_rate"] == 1.0
assert "memory_before" in report
assert "memory_after" in report
assert len([e for e in events if e[0] == "send"]) == 6
class TestReportOutput:
def test_write_report_serializes_json(self, tmp_path):
out = tmp_path / "ws-load-report.json"
payload = {"successful_connections": 4, "failed_connections": 0}
write_report(out, payload)
assert json.loads(out.read_text()) == payload
def test_main_writes_output_file(self, tmp_path):
out = tmp_path / "report.json"
fake_report = {
"url": "ws://127.0.0.1:8765",
"concurrency": 4,
"rounds": 2,
"attempted_connections": 8,
"successful_connections": 8,
"failed_connections": 0,
"messages_sent": 0,
"success_rate": 1.0,
"memory_before": {"rss_bytes": 1, "tracemalloc_current_bytes": 2, "tracemalloc_peak_bytes": 3},
"memory_after": {"rss_bytes": 4, "tracemalloc_current_bytes": 5, "tracemalloc_peak_bytes": 6},
}
async def fake_run_load_test(**_kwargs):
return fake_report
with patch.object(_mod, "run_load_test", fake_run_load_test):
rc = main([
"--url",
"ws://127.0.0.1:8765",
"--concurrency",
"4",
"--rounds",
"2",
"--output",
str(out),
])
assert rc == 0
assert json.loads(out.read_text()) == fake_report

View File

@@ -1,347 +0,0 @@
#!/usr/bin/env python3
"""
WebSocket Load Test for The Nexus Gateway (#1505).
Simulates concurrent WebSocket connections to measure:
- Connection success rate
- Message broadcast latency
- Memory usage under load
- Throughput (messages/second)
Usage:
python3 tests/ws_load_test.py [--connections 50] [--duration 30] [--messages 100]
Requirements:
pip install websockets psutil (psutil optional, for memory tracking)
"""
import argparse
import asyncio
import json
import os
import statistics
import sys
import time
from dataclasses import dataclass, field
from typing import List, Optional
try:
import websockets
except ImportError:
print("ERROR: websockets required. Install: pip install websockets")
sys.exit(1)
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DEFAULT_URL = "ws://localhost:8765"
DEFAULT_CONNECTIONS = 50
DEFAULT_DURATION = 30 # seconds
DEFAULT_MESSAGES = 100 # per connection
@dataclass
class ConnectionStats:
"""Stats for a single connection."""
connected: bool = False
connect_time_ms: float = 0
messages_sent: int = 0
messages_received: int = 0
errors: int = 0
latencies_ms: List[float] = field(default_factory=list)
@dataclass
class LoadTestResult:
"""Aggregated results from load test."""
total_connections: int = 0
successful_connections: int = 0
failed_connections: int = 0
total_messages_sent: int = 0
total_messages_received: int = 0
total_errors: int = 0
duration_seconds: float = 0
messages_per_second: float = 0
avg_latency_ms: float = 0
p50_latency_ms: float = 0
p95_latency_ms: float = 0
p99_latency_ms: float = 0
memory_start_mb: float = 0
memory_end_mb: float = 0
memory_delta_mb: float = 0
# ---------------------------------------------------------------------------
# Single connection worker
# ---------------------------------------------------------------------------
async def connection_worker(
worker_id: int,
url: str,
num_messages: int,
stats: ConnectionStats,
stop_event: asyncio.Event,
):
"""A single WebSocket connection that sends and receives messages."""
try:
start = time.perf_counter()
async with websockets.connect(url, open_timeout=5) as ws:
stats.connect_time_ms = (time.perf_counter() - start) * 1000
stats.connected = True
# Send messages with timestamps for latency measurement
for i in range(num_messages):
if stop_event.is_set():
break
msg = json.dumps({
"type": "load_test",
"worker_id": worker_id,
"seq": i,
"timestamp": time.time(),
})
try:
send_time = time.perf_counter()
await ws.send(msg)
stats.messages_sent += 1
# Wait for echo/broadcast back (with timeout)
try:
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
recv_time = time.perf_counter()
latency_ms = (recv_time - send_time) * 1000
stats.latencies_ms.append(latency_ms)
stats.messages_received += 1
except asyncio.TimeoutError:
pass # No response is OK for broadcast servers
except Exception as e:
stats.errors += 1
# Small delay between messages to avoid overwhelming
await asyncio.sleep(0.01)
except Exception as e:
stats.errors += 1
stats.connected = False
# ---------------------------------------------------------------------------
# Broadcast receiver (separate connection that only receives)
# ---------------------------------------------------------------------------
async def broadcast_receiver(
url: str,
received_count: dict,
stop_event: asyncio.Event,
):
"""A connection that only receives broadcasts to measure fan-out."""
try:
async with websockets.connect(url, open_timeout=5) as ws:
while not stop_event.is_set():
try:
msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
received_count["total"] += 1
except asyncio.TimeoutError:
continue
except Exception:
pass
# ---------------------------------------------------------------------------
# Memory monitoring
# ---------------------------------------------------------------------------
def get_memory_mb() -> float:
"""Get current process memory in MB."""
if HAS_PSUTIL:
return psutil.Process().memory_info().rss / (1024 * 1024)
return 0.0
# ---------------------------------------------------------------------------
# Main load test
# ---------------------------------------------------------------------------
async def run_load_test(
url: str,
num_connections: int,
duration: int,
messages_per_connection: int,
) -> LoadTestResult:
"""Run the WebSocket load test."""
result = LoadTestResult()
result.total_connections = num_connections
result.duration_seconds = duration
memory_start = get_memory_mb()
result.memory_start_mb = memory_start
stats_list: List[ConnectionStats] = [ConnectionStats() for _ in range(num_connections)]
stop_event = asyncio.Event()
# Also add a broadcast receiver to measure fan-out
broadcast_count = {"total": 0}
print(f"\n{'='*60}")
print(f"WebSocket Load Test")
print(f"{'='*60}")
print(f" URL: {url}")
print(f" Connections: {num_connections}")
print(f" Duration: {duration}s")
print(f" Messages/connection: {messages_per_connection}")
print(f"{'='*60}\n")
# Start timer
test_start = time.perf_counter()
# Create all connection tasks
tasks = []
for i in range(num_connections):
task = asyncio.create_task(
connection_worker(i, url, messages_per_connection, stats_list[i], stop_event)
)
tasks.append(task)
# Add broadcast receiver
recv_task = asyncio.create_task(broadcast_receiver(url, broadcast_count, stop_event))
# Wait for duration, then signal stop
print(f"Running load test for {duration} seconds...")
await asyncio.sleep(duration)
stop_event.set()
# Wait for all tasks to finish
await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.gather(recv_task, return_exceptions=True)
test_end = time.perf_counter()
actual_duration = test_end - test_start
# Aggregate results
all_latencies = []
for stats in stats_list:
if stats.connected:
result.successful_connections += 1
else:
result.failed_connections += 1
result.total_messages_sent += stats.messages_sent
result.total_messages_received += stats.messages_received
result.total_errors += stats.errors
all_latencies.extend(stats.latencies_ms)
result.duration_seconds = actual_duration
result.messages_per_second = result.total_messages_sent / actual_duration if actual_duration > 0 else 0
if all_latencies:
result.avg_latency_ms = statistics.mean(all_latencies)
sorted_latencies = sorted(all_latencies)
result.p50_latency_ms = sorted_latencies[len(sorted_latencies) // 2]
result.p95_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.95)]
result.p99_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.99)]
result.memory_end_mb = get_memory_mb()
result.memory_delta_mb = result.memory_end_mb - result.memory_start_mb
return result
# ---------------------------------------------------------------------------
# Report
# ---------------------------------------------------------------------------
def print_report(result: LoadTestResult):
"""Print load test results."""
print(f"\n{'='*60}")
print(f"Load Test Results")
print(f"{'='*60}")
print(f"\n--- Connections ---")
print(f" Total: {result.total_connections}")
print(f" Successful: {result.successful_connections}")
print(f" Failed: {result.failed_connections}")
conn_rate = result.successful_connections / result.total_connections * 100 if result.total_connections else 0
print(f" Success rate: {conn_rate:.1f}%")
print(f"\n--- Messages ---")
print(f" Sent: {result.total_messages_sent}")
print(f" Received: {result.total_messages_received}")
print(f" Errors: {result.total_errors}")
print(f" Throughput: {result.messages_per_second:.1f} msg/s")
print(f"\n--- Latency ---")
if result.avg_latency_ms > 0:
print(f" Average: {result.avg_latency_ms:.2f} ms")
print(f" P50: {result.p50_latency_ms:.2f} ms")
print(f" P95: {result.p95_latency_ms:.2f} ms")
print(f" P99: {result.p99_latency_ms:.2f} ms")
else:
print(f" No latency data (server may not echo)")
print(f"\n--- Memory ---")
if HAS_PSUTIL:
print(f" Start: {result.memory_start_mb:.1f} MB")
print(f" End: {result.memory_end_mb:.1f} MB")
print(f" Delta: {result.memory_delta_mb:+.1f} MB")
else:
print(f" psutil not installed — memory tracking disabled")
print(f"\n--- Duration ---")
print(f" {result.duration_seconds:.1f} seconds")
print(f"\n{'='*60}")
# Pass/fail verdict
issues = []
if result.failed_connections > 0:
issues.append(f"{result.failed_connections} connections failed")
if result.total_errors > result.total_messages_sent * 0.01:
issues.append(f"Error rate {result.total_errors/result.total_messages_sent*100:.1f}% exceeds 1%")
if result.p95_latency_ms > 500:
issues.append(f"P95 latency {result.p95_latency_ms:.0f}ms exceeds 500ms")
if issues:
print(f"ISSUES FOUND:")
for issue in issues:
print(f"{issue}")
print(f"\nVERDICT: FAIL")
return False
else:
print(f"VERDICT: PASS")
return True
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="WebSocket load test for The Nexus")
parser.add_argument("--url", default=DEFAULT_URL, help=f"WebSocket URL (default: {DEFAULT_URL})")
parser.add_argument("--connections", type=int, default=DEFAULT_CONNECTIONS, help=f"Number of concurrent connections (default: {DEFAULT_CONNECTIONS})")
parser.add_argument("--duration", type=int, default=DEFAULT_DURATION, help=f"Test duration in seconds (default: {DEFAULT_DURATION})")
parser.add_argument("--messages", type=int, default=DEFAULT_MESSAGES, help=f"Messages per connection (default: {DEFAULT_MESSAGES})")
args = parser.parse_args()
result = asyncio.run(run_load_test(
url=args.url,
num_connections=args.connections,
duration=args.duration,
messages_per_connection=args.messages,
))
passed = print_report(result)
sys.exit(0 if passed else 1)
if __name__ == "__main__":
main()