Compare commits
10 Commits
mimo/code/
...
fix/912
| Author | SHA1 | Date | |
|---|---|---|---|
| 41262ac350 | |||
| d1f6421c49 | |||
| 8d87dba309 | |||
| 9322742ef8 | |||
| 157f6f322d | |||
| 2978f48a6a | |||
| 4cb7d3e084 | |||
|
|
3fed634955 | ||
|
|
168cbb57c9 | ||
|
|
b79805118e |
54
config/zero_touch_forge.json
Normal file
54
config/zero_touch_forge.json
Normal file
@@ -0,0 +1,54 @@
|
||||
{
|
||||
"epic_issue": 912,
|
||||
"title": "The Zero-Touch Forge: Bare-Metal Fleet Bootstrap in 60 Minutes",
|
||||
"checks": [
|
||||
{
|
||||
"id": "os_bootstrap",
|
||||
"label": "OS bootstrap foothold",
|
||||
"required_files": ["scripts/provision-runner.sh"],
|
||||
"required_signals": []
|
||||
},
|
||||
{
|
||||
"id": "integrity_validation",
|
||||
"label": "Repository integrity validation",
|
||||
"required_files": [],
|
||||
"required_signals": ["has_crypto_integrity_verification"]
|
||||
},
|
||||
{
|
||||
"id": "secret_distribution",
|
||||
"label": "Encrypted seed / secret distribution",
|
||||
"required_files": [],
|
||||
"required_signals": ["has_age_seed_flow"]
|
||||
},
|
||||
{
|
||||
"id": "stack_startup",
|
||||
"label": "Full stack startup manifest",
|
||||
"required_files": ["docker-compose.yml", "fleet/fleet-routing.json"],
|
||||
"required_signals": ["has_stack_start_manifest"]
|
||||
},
|
||||
{
|
||||
"id": "test_gate",
|
||||
"label": "Bootstrap test gate",
|
||||
"required_files": [],
|
||||
"required_signals": ["has_test_gate"]
|
||||
},
|
||||
{
|
||||
"id": "checkpoint_restore",
|
||||
"label": "Checkpoint restore primitive",
|
||||
"required_files": ["scripts/lazarus_checkpoint.py"],
|
||||
"required_signals": []
|
||||
},
|
||||
{
|
||||
"id": "post_boot_notification",
|
||||
"label": "Post-boot notify Alexander only-after-healthy",
|
||||
"required_files": [],
|
||||
"required_signals": ["has_notification_step"]
|
||||
},
|
||||
{
|
||||
"id": "sixty_minute_sla",
|
||||
"label": "60-minute end-to-end timing budget",
|
||||
"required_files": [],
|
||||
"required_signals": ["has_sla_budget"]
|
||||
}
|
||||
]
|
||||
}
|
||||
51
docs/zero-touch-forge-readiness.md
Normal file
51
docs/zero-touch-forge-readiness.md
Normal file
@@ -0,0 +1,51 @@
|
||||
# Zero-Touch Forge Readiness
|
||||
|
||||
Epic: #912 — The Zero-Touch Forge: Bare-Metal Fleet Bootstrap in 60 Minutes
|
||||
|
||||
## Impossible Goal
|
||||
|
||||
Take a raw VPS plus only a git URL and encrypted seed, then bootstrap a full Timmy Foundation fleet in under 60 minutes with no human intervention after trigger.
|
||||
|
||||
This document does **not** claim the goal is solved. It grounds the epic in the current repo state.
|
||||
|
||||
Current primitive readiness: 2 ready / 6 blocked.
|
||||
|
||||
## Current Readiness Table
|
||||
|
||||
| Check | Status | Evidence | Missing Pieces |
|
||||
|-------|--------|----------|----------------|
|
||||
| OS bootstrap foothold | READY | scripts/provision-runner.sh=present | — |
|
||||
| Repository integrity validation | BLOCKED | has_crypto_integrity_verification=no | has_crypto_integrity_verification |
|
||||
| Encrypted seed / secret distribution | BLOCKED | has_age_seed_flow=no | has_age_seed_flow |
|
||||
| Full stack startup manifest | BLOCKED | docker-compose.yml=present, fleet/fleet-routing.json=present, has_stack_start_manifest=no | has_stack_start_manifest |
|
||||
| Bootstrap test gate | BLOCKED | has_test_gate=no | has_test_gate |
|
||||
| Checkpoint restore primitive | READY | scripts/lazarus_checkpoint.py=present | — |
|
||||
| Post-boot notify Alexander only-after-healthy | BLOCKED | has_notification_step=no | has_notification_step |
|
||||
| 60-minute end-to-end timing budget | BLOCKED | has_sla_budget=no | has_sla_budget |
|
||||
|
||||
## Interpretation
|
||||
|
||||
### What already exists
|
||||
- `scripts/provision-runner.sh` proves we already automate part of bare-metal service bootstrap.
|
||||
- `scripts/lazarus_checkpoint.py` proves we already have a checkpoint / restore primitive for mission state.
|
||||
- `docker-compose.yml`, `fleet/fleet-routing.json`, `operations/fleet-topology.md`, and `config/fleet_agents.json` show a real fleet shape, not just a philosophical wish.
|
||||
|
||||
### What is still missing
|
||||
- no verified cryptographic repo-integrity gate for a cold bootstrap run
|
||||
- no age-encrypted seed / recovery-bundle path in this repo
|
||||
- no single stack-start manifest that can bring up Gitea, Nostr relay, Ollama, and all agents from bare metal
|
||||
- no bootstrap test gate that refuses health until the full stack passes
|
||||
- no explicit notify-Alexander-only-after-healthy step
|
||||
- no measured 60-minute execution budget proving the impossible bar
|
||||
|
||||
## Next Concrete Build Steps
|
||||
|
||||
1. Add an age-based recovery bundle flow and a decrypt/distribute bootstrap primitive.
|
||||
2. Add a single stack-start manifest that covers Gitea + relay + Ollama + agent services from one command.
|
||||
3. Add a zero-touch health gate script that verifies the full stack before declaring success.
|
||||
4. Add a post-boot notification step that only fires after the health gate is green.
|
||||
5. Add a timed rehearsal harness so the 60-minute claim can be measured instead of imagined.
|
||||
|
||||
## Honest Bottom Line
|
||||
|
||||
The repo already contains useful bootstrap and recovery primitives, but it does **not** yet implement a true zero-touch forge. The epic remains open because the hard problems — trust bootstrapping, full-stack orchestration, and timed self-verification — are still unresolved.
|
||||
187
scripts/zero_touch_forge_readiness.py
Normal file
187
scripts/zero_touch_forge_readiness.py
Normal file
@@ -0,0 +1,187 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Zero-Touch Forge readiness grounding for epic #912.
|
||||
|
||||
This does not pretend the impossible goal is solved.
|
||||
It computes which primitive building blocks already exist in the repo and which
|
||||
critical gaps still block a true zero-touch forge.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
SPEC_PATH = REPO_ROOT / "config" / "zero_touch_forge.json"
|
||||
|
||||
|
||||
def load_spec(path: Path | None = None) -> dict[str, Any]:
|
||||
target = path or SPEC_PATH
|
||||
return json.loads(target.read_text())
|
||||
|
||||
|
||||
def _file_exists_map(repo_root: Path, paths: list[str]) -> dict[str, bool]:
|
||||
return {path: (repo_root / path).exists() for path in paths}
|
||||
|
||||
|
||||
def _agent_count(repo_root: Path) -> int:
|
||||
config_path = repo_root / "config" / "fleet_agents.json"
|
||||
if not config_path.exists():
|
||||
return 0
|
||||
try:
|
||||
payload = json.loads(config_path.read_text())
|
||||
return len(payload.get("agents") or [])
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def derive_signal_flags(repo_root: Path | None = None) -> dict[str, bool]:
|
||||
root = repo_root or REPO_ROOT
|
||||
agent_count = _agent_count(root)
|
||||
return {
|
||||
"has_age_seed_flow": False,
|
||||
"has_crypto_integrity_verification": False,
|
||||
"has_stack_start_manifest": agent_count >= 5,
|
||||
"has_test_gate": False,
|
||||
"has_notification_step": False,
|
||||
"has_sla_budget": False,
|
||||
}
|
||||
|
||||
|
||||
def _evidence_line(check: dict[str, Any], file_exists: dict[str, bool], signal_flags: dict[str, bool]) -> str:
|
||||
parts = []
|
||||
for path in check.get("required_files", []):
|
||||
parts.append(f"{path}={'present' if file_exists.get(path) else 'missing'}")
|
||||
for key in check.get("required_signals", []):
|
||||
parts.append(f"{key}={'yes' if signal_flags.get(key) else 'no'}")
|
||||
return ", ".join(parts) if parts else "no explicit evidence"
|
||||
|
||||
|
||||
def evaluate_readiness(
|
||||
spec: dict[str, Any],
|
||||
*,
|
||||
file_exists: dict[str, bool] | None = None,
|
||||
signal_flags: dict[str, bool] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
all_paths = []
|
||||
for check in spec["checks"]:
|
||||
all_paths.extend(check.get("required_files", []))
|
||||
|
||||
file_exists = file_exists or _file_exists_map(REPO_ROOT, sorted(set(all_paths)))
|
||||
signal_flags = signal_flags or derive_signal_flags(REPO_ROOT)
|
||||
|
||||
ready_checks = []
|
||||
blocked_checks = []
|
||||
checks = []
|
||||
|
||||
for check in spec["checks"]:
|
||||
missing_files = [path for path in check.get("required_files", []) if not file_exists.get(path, False)]
|
||||
missing_signals = [key for key in check.get("required_signals", []) if not signal_flags.get(key, False)]
|
||||
ready = not missing_files and not missing_signals
|
||||
result = {
|
||||
"id": check["id"],
|
||||
"label": check["label"],
|
||||
"ready": ready,
|
||||
"missing_files": missing_files,
|
||||
"missing_signals": missing_signals,
|
||||
"evidence": _evidence_line(check, file_exists, signal_flags),
|
||||
}
|
||||
checks.append(result)
|
||||
if ready:
|
||||
ready_checks.append(result)
|
||||
else:
|
||||
blocked_checks.append(result)
|
||||
|
||||
return {
|
||||
"epic_issue": spec["epic_issue"],
|
||||
"title": spec["title"],
|
||||
"ready_count": len(ready_checks),
|
||||
"blocked_count": len(blocked_checks),
|
||||
"ready_checks": ready_checks,
|
||||
"blocked_checks": blocked_checks,
|
||||
"checks": checks,
|
||||
"signals": signal_flags,
|
||||
"files": file_exists,
|
||||
}
|
||||
|
||||
|
||||
def render_markdown(report: dict[str, Any]) -> str:
|
||||
lines = [
|
||||
"# Zero-Touch Forge Readiness",
|
||||
"",
|
||||
f"Epic: #{report['epic_issue']} — {report['title']}",
|
||||
"",
|
||||
"## Impossible Goal",
|
||||
"",
|
||||
"Take a raw VPS plus only a git URL and encrypted seed, then bootstrap a full Timmy Foundation fleet in under 60 minutes with no human intervention after trigger.",
|
||||
"",
|
||||
"This document does **not** claim the goal is solved. It grounds the epic in the current repo state.",
|
||||
"",
|
||||
f"Current primitive readiness: {report['ready_count']} ready / {report['blocked_count']} blocked.",
|
||||
"",
|
||||
"## Current Readiness Table",
|
||||
"",
|
||||
"| Check | Status | Evidence | Missing Pieces |",
|
||||
"|-------|--------|----------|----------------|",
|
||||
]
|
||||
for check in report["checks"]:
|
||||
status = "READY" if check["ready"] else "BLOCKED"
|
||||
missing = ", ".join(check["missing_files"] + check["missing_signals"]) or "—"
|
||||
lines.append(f"| {check['label']} | {status} | {check['evidence']} | {missing} |")
|
||||
|
||||
lines.extend([
|
||||
"",
|
||||
"## Interpretation",
|
||||
"",
|
||||
"### What already exists",
|
||||
"- `scripts/provision-runner.sh` proves we already automate part of bare-metal service bootstrap.",
|
||||
"- `scripts/lazarus_checkpoint.py` proves we already have a checkpoint / restore primitive for mission state.",
|
||||
"- `docker-compose.yml`, `fleet/fleet-routing.json`, `operations/fleet-topology.md`, and `config/fleet_agents.json` show a real fleet shape, not just a philosophical wish.",
|
||||
"",
|
||||
"### What is still missing",
|
||||
"- no verified cryptographic repo-integrity gate for a cold bootstrap run",
|
||||
"- no age-encrypted seed / recovery-bundle path in this repo",
|
||||
"- no single stack-start manifest that can bring up Gitea, Nostr relay, Ollama, and all agents from bare metal",
|
||||
"- no bootstrap test gate that refuses health until the full stack passes",
|
||||
"- no explicit notify-Alexander-only-after-healthy step",
|
||||
"- no measured 60-minute execution budget proving the impossible bar",
|
||||
"",
|
||||
"## Next Concrete Build Steps",
|
||||
"",
|
||||
"1. Add an age-based recovery bundle flow and a decrypt/distribute bootstrap primitive.",
|
||||
"2. Add a single stack-start manifest that covers Gitea + relay + Ollama + agent services from one command.",
|
||||
"3. Add a zero-touch health gate script that verifies the full stack before declaring success.",
|
||||
"4. Add a post-boot notification step that only fires after the health gate is green.",
|
||||
"5. Add a timed rehearsal harness so the 60-minute claim can be measured instead of imagined.",
|
||||
"",
|
||||
"## Honest Bottom Line",
|
||||
"",
|
||||
"The repo already contains useful bootstrap and recovery primitives, but it does **not** yet implement a true zero-touch forge. The epic remains open because the hard problems — trust bootstrapping, full-stack orchestration, and timed self-verification — are still unresolved.",
|
||||
"",
|
||||
])
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Evaluate repo readiness for the Zero-Touch Forge epic.")
|
||||
parser.add_argument("--json", action="store_true", help="Emit JSON instead of markdown")
|
||||
parser.add_argument("--out", type=Path, help="Optional output file")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
spec = load_spec()
|
||||
report = evaluate_readiness(spec)
|
||||
output = json.dumps(report, indent=2) if args.json else render_markdown(report)
|
||||
if args.out:
|
||||
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||
args.out.write_text(output)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
118
server.py
118
server.py
@@ -3,20 +3,34 @@
|
||||
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
|
||||
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
|
||||
the body (Evennia/Morrowind), and the visualization surface.
|
||||
|
||||
Security features:
|
||||
- Binds to 127.0.0.1 by default (localhost only)
|
||||
- Optional external binding via NEXUS_WS_HOST environment variable
|
||||
- Token-based authentication via NEXUS_WS_TOKEN environment variable
|
||||
- Rate limiting on connections
|
||||
- Connection logging and monitoring
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from typing import Set
|
||||
import time
|
||||
from typing import Set, Dict, Optional
|
||||
from collections import defaultdict
|
||||
|
||||
# Branch protected file - see POLICY.md
|
||||
import websockets
|
||||
|
||||
# Configuration
|
||||
PORT = 8765
|
||||
HOST = "0.0.0.0" # Allow external connections if needed
|
||||
PORT = int(os.environ.get("NEXUS_WS_PORT", "8765"))
|
||||
HOST = os.environ.get("NEXUS_WS_HOST", "127.0.0.1") # Default to localhost only
|
||||
AUTH_TOKEN = os.environ.get("NEXUS_WS_TOKEN", "") # Empty = no auth required
|
||||
RATE_LIMIT_WINDOW = 60 # seconds
|
||||
RATE_LIMIT_MAX_CONNECTIONS = 10 # max connections per IP per window
|
||||
RATE_LIMIT_MAX_MESSAGES = 100 # max messages per connection per window
|
||||
|
||||
# Logging setup
|
||||
logging.basicConfig(
|
||||
@@ -28,15 +42,97 @@ logger = logging.getLogger("nexus-gateway")
|
||||
|
||||
# State
|
||||
clients: Set[websockets.WebSocketServerProtocol] = set()
|
||||
connection_tracker: Dict[str, list] = defaultdict(list) # IP -> [timestamps]
|
||||
message_tracker: Dict[int, list] = defaultdict(list) # connection_id -> [timestamps]
|
||||
|
||||
def check_rate_limit(ip: str) -> bool:
|
||||
"""Check if IP has exceeded connection rate limit."""
|
||||
now = time.time()
|
||||
# Clean old entries
|
||||
connection_tracker[ip] = [t for t in connection_tracker[ip] if now - t < RATE_LIMIT_WINDOW]
|
||||
|
||||
if len(connection_tracker[ip]) >= RATE_LIMIT_MAX_CONNECTIONS:
|
||||
return False
|
||||
|
||||
connection_tracker[ip].append(now)
|
||||
return True
|
||||
|
||||
def check_message_rate_limit(connection_id: int) -> bool:
|
||||
"""Check if connection has exceeded message rate limit."""
|
||||
now = time.time()
|
||||
# Clean old entries
|
||||
message_tracker[connection_id] = [t for t in message_tracker[connection_id] if now - t < RATE_LIMIT_WINDOW]
|
||||
|
||||
if len(message_tracker[connection_id]) >= RATE_LIMIT_MAX_MESSAGES:
|
||||
return False
|
||||
|
||||
message_tracker[connection_id].append(now)
|
||||
return True
|
||||
|
||||
async def authenticate_connection(websocket: websockets.WebSocketServerProtocol) -> bool:
|
||||
"""Authenticate WebSocket connection using token."""
|
||||
if not AUTH_TOKEN:
|
||||
# No authentication required
|
||||
return True
|
||||
|
||||
try:
|
||||
# Wait for authentication message (first message should be auth)
|
||||
auth_message = await asyncio.wait_for(websocket.recv(), timeout=5.0)
|
||||
auth_data = json.loads(auth_message)
|
||||
|
||||
if auth_data.get("type") != "auth":
|
||||
logger.warning(f"Invalid auth message type from {websocket.remote_address}")
|
||||
return False
|
||||
|
||||
token = auth_data.get("token", "")
|
||||
if token != AUTH_TOKEN:
|
||||
logger.warning(f"Invalid auth token from {websocket.remote_address}")
|
||||
return False
|
||||
|
||||
logger.info(f"Authenticated connection from {websocket.remote_address}")
|
||||
return True
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"Authentication timeout from {websocket.remote_address}")
|
||||
return False
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Invalid auth JSON from {websocket.remote_address}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Authentication error from {websocket.remote_address}: {e}")
|
||||
return False
|
||||
|
||||
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
|
||||
"""Handles individual client connections and message broadcasting."""
|
||||
clients.add(websocket)
|
||||
addr = websocket.remote_address
|
||||
ip = addr[0] if addr else "unknown"
|
||||
connection_id = id(websocket)
|
||||
|
||||
# Check connection rate limit
|
||||
if not check_rate_limit(ip):
|
||||
logger.warning(f"Connection rate limit exceeded for {ip}")
|
||||
await websocket.close(1008, "Rate limit exceeded")
|
||||
return
|
||||
|
||||
# Authenticate if token is required
|
||||
if not await authenticate_connection(websocket):
|
||||
await websocket.close(1008, "Authentication failed")
|
||||
return
|
||||
|
||||
clients.add(websocket)
|
||||
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
# Check message rate limit
|
||||
if not check_message_rate_limit(connection_id):
|
||||
logger.warning(f"Message rate limit exceeded for {addr}")
|
||||
await websocket.send(json.dumps({
|
||||
"type": "error",
|
||||
"message": "Message rate limit exceeded"
|
||||
}))
|
||||
continue
|
||||
|
||||
# Parse for logging/validation if it's JSON
|
||||
try:
|
||||
data = json.loads(message)
|
||||
@@ -81,6 +177,20 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
|
||||
|
||||
async def main():
|
||||
"""Main server loop with graceful shutdown."""
|
||||
# Log security configuration
|
||||
if AUTH_TOKEN:
|
||||
logger.info("Authentication: ENABLED (token required)")
|
||||
else:
|
||||
logger.warning("Authentication: DISABLED (no token required)")
|
||||
|
||||
if HOST == "0.0.0.0":
|
||||
logger.warning("Host binding: 0.0.0.0 (all interfaces) - SECURITY RISK")
|
||||
else:
|
||||
logger.info(f"Host binding: {HOST} (localhost only)")
|
||||
|
||||
logger.info(f"Rate limiting: {RATE_LIMIT_MAX_CONNECTIONS} connections/IP/{RATE_LIMIT_WINDOW}s, "
|
||||
f"{RATE_LIMIT_MAX_MESSAGES} messages/connection/{RATE_LIMIT_WINDOW}s")
|
||||
|
||||
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
|
||||
|
||||
# Set up signal handlers for graceful shutdown
|
||||
|
||||
193
tests/load/websocket_load_test.py
Normal file
193
tests/load/websocket_load_test.py
Normal file
@@ -0,0 +1,193 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
WebSocket Load Test — Benchmark concurrent user sessions on the Nexus gateway.
|
||||
|
||||
Tests:
|
||||
- Concurrent WebSocket connections
|
||||
- Message throughput under load
|
||||
- Memory profiling per connection
|
||||
- Connection failure/recovery
|
||||
|
||||
Usage:
|
||||
python3 tests/load/websocket_load_test.py # default (50 users)
|
||||
python3 tests/load/websocket_load_test.py --users 200 # 200 concurrent
|
||||
python3 tests/load/websocket_load_test.py --duration 60 # 60 second test
|
||||
python3 tests/load/websocket_load_test.py --json # JSON output
|
||||
|
||||
Ref: #1505
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import argparse
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
WS_URL = os.environ.get("WS_URL", "ws://localhost:8765")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConnectionStats:
|
||||
connected: bool = False
|
||||
connect_time_ms: float = 0
|
||||
messages_sent: int = 0
|
||||
messages_received: int = 0
|
||||
errors: int = 0
|
||||
latencies: List[float] = field(default_factory=list)
|
||||
disconnected: bool = False
|
||||
|
||||
|
||||
async def ws_client(user_id: int, duration: int, stats: ConnectionStats, ws_url: str = WS_URL):
|
||||
"""Single WebSocket client for load testing."""
|
||||
try:
|
||||
import websockets
|
||||
except ImportError:
|
||||
# Fallback: use raw asyncio
|
||||
stats.errors += 1
|
||||
return
|
||||
|
||||
try:
|
||||
start = time.time()
|
||||
async with websockets.connect(ws_url, open_timeout=5) as ws:
|
||||
stats.connect_time_ms = (time.time() - start) * 1000
|
||||
stats.connected = True
|
||||
|
||||
# Send periodic messages for the duration
|
||||
end_time = time.time() + duration
|
||||
msg_count = 0
|
||||
while time.time() < end_time:
|
||||
try:
|
||||
msg_start = time.time()
|
||||
message = json.dumps({
|
||||
"type": "chat",
|
||||
"user": f"load-test-{user_id}",
|
||||
"content": f"Load test message {msg_count} from user {user_id}",
|
||||
})
|
||||
await ws.send(message)
|
||||
stats.messages_sent += 1
|
||||
|
||||
# Wait for response (with timeout)
|
||||
try:
|
||||
response = await asyncio.wait_for(ws.recv(), timeout=5.0)
|
||||
stats.messages_received += 1
|
||||
latency = (time.time() - msg_start) * 1000
|
||||
stats.latencies.append(latency)
|
||||
except asyncio.TimeoutError:
|
||||
stats.errors += 1
|
||||
|
||||
msg_count += 1
|
||||
await asyncio.sleep(0.5) # 2 messages/sec per user
|
||||
|
||||
except websockets.exceptions.ConnectionClosed:
|
||||
stats.disconnected = True
|
||||
break
|
||||
except Exception:
|
||||
stats.errors += 1
|
||||
|
||||
except Exception as e:
|
||||
stats.errors += 1
|
||||
if "Connection refused" in str(e) or "connect" in str(e).lower():
|
||||
pass # Expected if server not running
|
||||
|
||||
|
||||
async def run_load_test(users: int, duration: int, ws_url: str = WS_URL) -> dict:
|
||||
"""Run the load test with N concurrent users."""
|
||||
stats = [ConnectionStats() for _ in range(users)]
|
||||
|
||||
print(f" Starting {users} concurrent connections for {duration}s...")
|
||||
start = time.time()
|
||||
|
||||
tasks = [ws_client(i, duration, stats[i], ws_url) for i in range(users)]
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
total_time = time.time() - start
|
||||
|
||||
# Aggregate results
|
||||
connected = sum(1 for s in stats if s.connected)
|
||||
total_sent = sum(s.messages_sent for s in stats)
|
||||
total_received = sum(s.messages_received for s in stats)
|
||||
total_errors = sum(s.errors for s in stats)
|
||||
disconnected = sum(1 for s in stats if s.disconnected)
|
||||
|
||||
all_latencies = []
|
||||
for s in stats:
|
||||
all_latencies.extend(s.latencies)
|
||||
|
||||
avg_latency = sum(all_latencies) / len(all_latencies) if all_latencies else 0
|
||||
p95_latency = sorted(all_latencies)[int(len(all_latencies) * 0.95)] if all_latencies else 0
|
||||
p99_latency = sorted(all_latencies)[int(len(all_latencies) * 0.99)] if all_latencies else 0
|
||||
|
||||
avg_connect_time = sum(s.connect_time_ms for s in stats if s.connected) / connected if connected else 0
|
||||
|
||||
return {
|
||||
"users": users,
|
||||
"duration_seconds": round(total_time, 1),
|
||||
"connected": connected,
|
||||
"connect_rate": round(connected / users * 100, 1),
|
||||
"messages_sent": total_sent,
|
||||
"messages_received": total_received,
|
||||
"throughput_msg_per_sec": round(total_sent / total_time, 1) if total_time > 0 else 0,
|
||||
"avg_latency_ms": round(avg_latency, 1),
|
||||
"p95_latency_ms": round(p95_latency, 1),
|
||||
"p99_latency_ms": round(p99_latency, 1),
|
||||
"avg_connect_time_ms": round(avg_connect_time, 1),
|
||||
"errors": total_errors,
|
||||
"disconnected": disconnected,
|
||||
}
|
||||
|
||||
|
||||
def print_report(result: dict):
|
||||
"""Print load test report."""
|
||||
print(f"\n{'='*60}")
|
||||
print(f" WEBSOCKET LOAD TEST REPORT")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
print(f" Connections: {result['connected']}/{result['users']} ({result['connect_rate']}%)")
|
||||
print(f" Duration: {result['duration_seconds']}s")
|
||||
print(f" Messages sent: {result['messages_sent']}")
|
||||
print(f" Messages recv: {result['messages_received']}")
|
||||
print(f" Throughput: {result['throughput_msg_per_sec']} msg/s")
|
||||
print(f" Avg connect: {result['avg_connect_time_ms']}ms")
|
||||
print()
|
||||
print(f" Latency:")
|
||||
print(f" Avg: {result['avg_latency_ms']}ms")
|
||||
print(f" P95: {result['p95_latency_ms']}ms")
|
||||
print(f" P99: {result['p99_latency_ms']}ms")
|
||||
print()
|
||||
print(f" Errors: {result['errors']}")
|
||||
print(f" Disconnected: {result['disconnected']}")
|
||||
|
||||
# Verdict
|
||||
if result['connect_rate'] >= 95 and result['errors'] == 0:
|
||||
print(f"\n ✅ PASS")
|
||||
elif result['connect_rate'] >= 80:
|
||||
print(f"\n ⚠️ DEGRADED")
|
||||
else:
|
||||
print(f"\n ❌ FAIL")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="WebSocket Load Test")
|
||||
parser.add_argument("--users", type=int, default=50, help="Concurrent users")
|
||||
parser.add_argument("--duration", type=int, default=30, help="Test duration in seconds")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
parser.add_argument("--url", default=WS_URL, help="WebSocket URL")
|
||||
args = parser.parse_args()
|
||||
|
||||
ws_url = args.url
|
||||
|
||||
print(f"\nWebSocket Load Test — {args.users} users, {args.duration}s\n")
|
||||
|
||||
result = asyncio.run(run_load_test(args.users, args.duration, ws_url))
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print_report(result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
67
tests/test_zero_touch_forge_readiness.py
Normal file
67
tests/test_zero_touch_forge_readiness.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from scripts.zero_touch_forge_readiness import evaluate_readiness, load_spec
|
||||
|
||||
|
||||
DOC = Path("docs/zero-touch-forge-readiness.md")
|
||||
|
||||
|
||||
def test_load_spec_contains_all_impossible_bar_checks():
|
||||
spec = load_spec()
|
||||
check_ids = [item["id"] for item in spec["checks"]]
|
||||
assert check_ids == [
|
||||
"os_bootstrap",
|
||||
"integrity_validation",
|
||||
"secret_distribution",
|
||||
"stack_startup",
|
||||
"test_gate",
|
||||
"checkpoint_restore",
|
||||
"post_boot_notification",
|
||||
"sixty_minute_sla",
|
||||
]
|
||||
|
||||
|
||||
def test_evaluate_readiness_marks_missing_components_as_blockers():
|
||||
spec = load_spec()
|
||||
result = evaluate_readiness(
|
||||
spec,
|
||||
file_exists={
|
||||
"scripts/provision-runner.sh": True,
|
||||
"scripts/lazarus_checkpoint.py": True,
|
||||
"operations/fleet-topology.md": True,
|
||||
"docker-compose.yml": False,
|
||||
"fleet/fleet-routing.json": False,
|
||||
"tests/test_bootstrap_contract.py": False,
|
||||
},
|
||||
signal_flags={
|
||||
"has_age_seed_flow": False,
|
||||
"has_crypto_integrity_verification": False,
|
||||
"has_stack_start_manifest": False,
|
||||
"has_test_gate": False,
|
||||
"has_notification_step": False,
|
||||
"has_sla_budget": False,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["ready_count"] == 2
|
||||
blocked = {item["id"] for item in result["blocked_checks"]}
|
||||
assert blocked == {
|
||||
"integrity_validation",
|
||||
"secret_distribution",
|
||||
"stack_startup",
|
||||
"test_gate",
|
||||
"post_boot_notification",
|
||||
"sixty_minute_sla",
|
||||
}
|
||||
|
||||
|
||||
def test_document_exists_with_required_sections():
|
||||
assert DOC.exists(), "expected zero-touch forge readiness doc to exist"
|
||||
content = DOC.read_text()
|
||||
assert "# Zero-Touch Forge Readiness" in content
|
||||
assert "## Impossible Goal" in content
|
||||
assert "## Current Readiness Table" in content
|
||||
assert "## Next Concrete Build Steps" in content
|
||||
Reference in New Issue
Block a user