Files
Timmy-time-dashboard/src/infrastructure/world/hardening/stress.py
Timmy Time 4d2aeb937f
Some checks failed
Tests / lint (push) Successful in 11s
Tests / test (push) Has been cancelled
[loop-cycle-7] refactor: split research.py into research/ subpackage (#1405) (#1458)
2026-03-24 19:53:04 +00:00

166 lines
5.3 KiB
Python

"""Multi-client stress runner — validates 6+ concurrent automated agents.
Runs N simultaneous ``MockWorldAdapter`` instances through heartbeat cycles
concurrently via asyncio and collects per-client results. The runner is
the primary gate for Phase 8 multi-player stability requirements.
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.benchmark.scenarios import BenchmarkScenario
from infrastructure.world.types import ActionStatus, CommandInput
logger = logging.getLogger(__name__)
@dataclass
class ClientResult:
"""Result for a single simulated client in a stress run."""
client_id: str
cycles_completed: int = 0
actions_taken: int = 0
errors: list[str] = field(default_factory=list)
wall_time_ms: int = 0
success: bool = False
@dataclass
class StressTestReport:
"""Aggregated report across all simulated clients."""
client_count: int
scenario_name: str
results: list[ClientResult] = field(default_factory=list)
total_time_ms: int = 0
timestamp: str = ""
@property
def success_count(self) -> int:
return sum(1 for r in self.results if r.success)
@property
def error_count(self) -> int:
return sum(len(r.errors) for r in self.results)
@property
def all_passed(self) -> bool:
return all(r.success for r in self.results)
def summary(self) -> str:
lines = [
f"=== Stress Test: {self.scenario_name} ===",
f"Clients: {self.client_count} Passed: {self.success_count} "
f"Errors: {self.error_count} Time: {self.total_time_ms} ms",
]
for r in self.results:
status = "OK" if r.success else "FAIL"
lines.append(
f" [{status}] {r.client_id}"
f"{r.cycles_completed} cycles, {r.actions_taken} actions, "
f"{r.wall_time_ms} ms"
)
for err in r.errors:
lines.append(f" Error: {err}")
return "\n".join(lines)
class MultiClientStressRunner:
"""Run N concurrent automated clients through a scenario.
Each client gets its own ``MockWorldAdapter`` instance. All clients
run their observe/act cycles concurrently via ``asyncio.gather``.
Parameters
----------
client_count:
Number of simultaneous clients. Must be >= 1.
Phase 8 target is 6+ (see ``MIN_CLIENTS_FOR_PHASE8``).
cycles_per_client:
How many observe→act cycles each client executes.
"""
MIN_CLIENTS_FOR_PHASE8 = 6
def __init__(
self,
*,
client_count: int = 6,
cycles_per_client: int = 5,
) -> None:
if client_count < 1:
raise ValueError("client_count must be >= 1")
self._client_count = client_count
self._cycles = cycles_per_client
@property
def meets_phase8_requirement(self) -> bool:
"""True when client_count >= 6 (Phase 8 multi-player target)."""
return self._client_count >= self.MIN_CLIENTS_FOR_PHASE8
async def run(self, scenario: BenchmarkScenario) -> StressTestReport:
"""Launch all clients concurrently and return the aggregated report."""
report = StressTestReport(
client_count=self._client_count,
scenario_name=scenario.name,
timestamp=datetime.now(UTC).isoformat(),
)
suite_start = time.monotonic()
tasks = [self._run_client(f"client-{i:02d}", scenario) for i in range(self._client_count)]
report.results = list(await asyncio.gather(*tasks))
report.total_time_ms = int((time.monotonic() - suite_start) * 1000)
logger.info(
"StressTest '%s': %d/%d clients passed in %d ms",
scenario.name,
report.success_count,
self._client_count,
report.total_time_ms,
)
return report
async def _run_client(
self,
client_id: str,
scenario: BenchmarkScenario,
) -> ClientResult:
result = ClientResult(client_id=client_id)
adapter = MockWorldAdapter(
location=scenario.start_location,
entities=list(scenario.entities),
events=list(scenario.events),
)
adapter.connect()
start = time.monotonic()
try:
for _ in range(self._cycles):
perception = adapter.observe()
result.cycles_completed += 1
cmd = CommandInput(
action="observe",
parameters={"location": perception.location},
)
action_result = adapter.act(cmd)
if action_result.status == ActionStatus.SUCCESS:
result.actions_taken += 1
# Yield to the event loop between cycles
await asyncio.sleep(0)
result.success = True
except Exception as exc:
msg = f"{type(exc).__name__}: {exc}"
result.errors.append(msg)
logger.warning("StressTest client %s failed: %s", client_id, msg)
finally:
adapter.disconnect()
result.wall_time_ms = int((time.monotonic() - start) * 1000)
return result