#!/usr/bin/env python3 """ Timmy Academy - Automated Stress Test (Fenrir Protocol) Simulates multiple concurrent players connecting to the Timmy Academy MUD, performing random actions, and measuring system performance under load. Usage: python tests/stress_test.py [--players N] [--duration SECS] [--actions-per-second N] [--host HOST] [--port PORT] Examples: python tests/stress_test.py # defaults: 10 players, 30s, 2 actions/sec python tests/stress_test.py --players 25 --duration 60 --actions-per-second 5 python tests/stress_test.py --host 167.99.126.228 --port 4000 --players 50 Requirements: Python 3.8+ (stdlib only, no external dependencies) """ import argparse import asyncio import json import os import random import statistics import sys import time from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Optional # ============================================================================= # Configuration # ============================================================================= DEFAULT_HOST = "localhost" DEFAULT_PORT = 4000 DEFAULT_PLAYERS = 10 DEFAULT_DURATION = 30 # seconds DEFAULT_ACTIONS_PER_SECOND = 2.0 TELNET_TIMEOUT = 10 # seconds # Actions a virtual player can perform, with relative weights PLAYER_ACTIONS = [ ("look", 20), # Look at current room ("north", 8), # Move north ("south", 8), # Move south ("east", 8), # Move east ("west", 8), # Move west ("up", 4), # Move up ("down", 4), # Move down ("examine", 10), # Examine room or object ("@status", 6), # Check agent status ("@map", 5), # View map ("@academy", 3), # Academy overview ("rooms", 3), # List rooms ("smell", 5), # Smell the room ("listen", 5), # Listen to the room ("say Hello everyone!", 3), # Say something ] # Build weighted action list WEIGHTED_ACTIONS = [] for action, weight in PLAYER_ACTIONS: WEIGHTED_ACTIONS.extend([action] * weight) # ============================================================================= # Data Classes # ============================================================================= @dataclass class ActionResult: """Result of a single action execution.""" player_id: int action: str latency_ms: float success: bool error: Optional[str] = None timestamp: float = field(default_factory=time.time) @dataclass class PlayerStats: """Accumulated stats for a single virtual player.""" player_id: int actions_completed: int = 0 actions_failed: int = 0 errors: list = field(default_factory=list) latencies: list = field(default_factory=list) connected: bool = False connect_time_ms: float = 0.0 @dataclass class StressTestReport: """Final aggregated report from the stress test.""" start_time: str = "" end_time: str = "" duration_seconds: float = 0.0 host: str = "" port: int = 0 num_players: int = 0 target_actions_per_second: float = 0.0 total_actions: int = 0 successful_actions: int = 0 failed_actions: int = 0 error_rate_percent: float = 0.0 throughput_actions_per_sec: float = 0.0 latency_min_ms: float = 0.0 latency_max_ms: float = 0.0 latency_mean_ms: float = 0.0 latency_median_ms: float = 0.0 latency_p90_ms: float = 0.0 latency_p95_ms: float = 0.0 latency_p99_ms: float = 0.0 connections_succeeded: int = 0 connections_failed: int = 0 avg_connect_time_ms: float = 0.0 action_breakdown: dict = field(default_factory=dict) top_errors: list = field(default_factory=list) player_summaries: list = field(default_factory=list) # ============================================================================= # Telnet Client (minimal, stdlib only) # ============================================================================= class MudClient: """Minimal async telnet client for Evennia MUD interaction.""" def __init__(self, host: str, port: int, player_id: int): self.host = host self.port = port self.player_id = player_id self.reader: Optional[asyncio.StreamReader] = None self.writer: Optional[asyncio.StreamWriter] = None self.connected = False async def connect(self) -> float: """Connect to the MUD. Returns connection time in ms.""" start = time.time() try: self.reader, self.writer = await asyncio.wait_for( asyncio.open_connection(self.host, self.port), timeout=TELNET_TIMEOUT ) # Read initial banner/login prompt await asyncio.wait_for(self._read_until_prompt(), timeout=TELNET_TIMEOUT) self.connected = True return (time.time() - start) * 1000 except Exception as e: self.connected = False raise ConnectionError(f"Player {self.player_id}: Failed to connect: {e}") async def disconnect(self): """Gracefully disconnect.""" self.connected = False if self.writer: try: self.writer.close() await self.writer.wait_closed() except Exception: pass async def send_command(self, command: str) -> tuple[float, str]: """ Send a command and wait for response. Returns (latency_ms, response_text). """ if not self.connected or not self.writer: raise ConnectionError("Not connected") start = time.time() try: # Send command with newline self.writer.write(f"{command}\r\n".encode("utf-8")) await self.writer.drain() # Read response until we see a prompt character response = await asyncio.wait_for( self._read_until_prompt(), timeout=TELNET_TIMEOUT ) latency = (time.time() - start) * 1000 return latency, response except asyncio.TimeoutError: latency = (time.time() - start) * 1000 raise TimeoutError(f"Timeout after {latency:.0f}ms waiting for response to '{command}'") except Exception as e: latency = (time.time() - start) * 1000 raise RuntimeError(f"Error after {latency:.0f}ms: {e}") async def _read_until_prompt(self, max_bytes: int = 8192) -> str: """Read data until we see a prompt indicator or buffer limit.""" buffer = b"" prompt_chars = (b">", b"]", b":") # Common MUD prompt endings while len(buffer) < max_bytes: try: chunk = await asyncio.wait_for( self.reader.read(1024), timeout=2.0 ) if not chunk: break buffer += chunk # Check if we've received a complete response # (ends with prompt char or we have enough data) if any(buffer.rstrip().endswith(pc) for pc in prompt_chars): break if len(buffer) > 512: # Got enough data, don't wait forever break except asyncio.TimeoutError: # No more data coming break except Exception: break return buffer.decode("utf-8", errors="replace") # ============================================================================= # Virtual Player # ============================================================================= class VirtualPlayer: """Simulates a single player performing random actions.""" def __init__(self, player_id: int, host: str, port: int, actions_per_second: float, stop_event: asyncio.Event, results_queue: asyncio.Queue): self.player_id = player_id self.host = host self.port = port self.actions_per_second = actions_per_second self.stop_event = stop_event self.results_queue = results_queue self.stats = PlayerStats(player_id=player_id) self.client = MudClient(host, port, player_id) self.action_count = 0 async def run(self): """Main player loop: connect, perform actions, disconnect.""" try: # Connect connect_ms = await self.client.connect() self.stats.connected = True self.stats.connect_time_ms = connect_ms # Log in with a unique character name await self._login() # Perform actions until stopped interval = 1.0 / self.actions_per_second while not self.stop_event.is_set(): action = random.choice(WEIGHTED_ACTIONS) await self._perform_action(action) # Jitter the interval +/- 30% jitter = interval * random.uniform(0.7, 1.3) try: await asyncio.wait_for( self.stop_event.wait(), timeout=jitter ) break # Stop event was set except asyncio.TimeoutError: pass # Timeout is expected, continue loop except ConnectionError as e: self.stats.errors.append(str(e)) await self.results_queue.put(ActionResult( player_id=self.player_id, action="connect", latency_ms=0, success=False, error=str(e) )) except Exception as e: self.stats.errors.append(f"Unexpected: {e}") finally: await self.client.disconnect() async def _login(self): """Handle Evennia login flow.""" # Send character name to connect/create name = f"StressBot{self.player_id:03d}" try: # Evennia login: send name, then handle the response latency, response = await self.client.send_command(name) # If asked for password, send a simple one if "password" in response.lower() or "create" in response.lower(): await self.client.send_command("stress123") # Wait for game prompt await asyncio.sleep(0.5) except Exception: # Login might fail if account doesn't exist, that's ok # The player will still be in the login flow and can issue commands pass async def _perform_action(self, action: str): """Execute a single action and record results.""" self.action_count += 1 result = ActionResult( player_id=self.player_id, action=action, latency_ms=0, success=False ) try: latency, response = await self.client.send_command(action) result.latency_ms = latency result.success = True self.stats.actions_completed += 1 self.stats.latencies.append(latency) except Exception as e: result.success = False result.error = str(e) result.latency_ms = getattr(e, 'latency_ms', 0) if hasattr(e, 'latency_ms') else 0 self.stats.actions_failed += 1 self.stats.errors.append(str(e)) await self.results_queue.put(result) # ============================================================================= # Test Runner # ============================================================================= class StressTestRunner: """Orchestrates the full stress test.""" def __init__(self, host: str, port: int, num_players: int, duration: float, actions_per_second: float): self.host = host self.port = port self.num_players = num_players self.duration = duration self.actions_per_second = actions_per_second self.results: list[ActionResult] = [] self.player_stats: dict[int, PlayerStats] = {} self.start_time: Optional[datetime] = None self.end_time: Optional[datetime] = None async def run(self) -> StressTestReport: """Execute the full stress test and return report.""" print(f"\n{'='*60}") print(f" TIMMY ACADEMY - Fenrir Stress Test Protocol") print(f"{'='*60}") print(f" Target: {self.host}:{self.port}") print(f" Players: {self.num_players}") print(f" Duration: {self.duration}s") print(f" Rate: {self.actions_per_second} actions/sec/player") print(f" Expected: ~{int(self.num_players * self.actions_per_second * self.duration)} total actions") print(f"{'='*60}\n") self.start_time = datetime.now(timezone.utc) stop_event = asyncio.Event() results_queue = asyncio.Queue() # Create virtual players players = [ VirtualPlayer( player_id=i, host=self.host, port=self.port, actions_per_second=self.actions_per_second, stop_event=stop_event, results_queue=results_queue ) for i in range(self.num_players) ] # Start all players concurrently print(f"[{self._timestamp()}] Launching {self.num_players} virtual players...") tasks = [asyncio.create_task(player.run()) for player in players] # Collect results while players run collector_task = asyncio.create_task( self._collect_results(results_queue, stop_event, len(players)) ) # Wait for duration print(f"[{self._timestamp()}] Running for {self.duration} seconds...") try: await asyncio.sleep(self.duration) except KeyboardInterrupt: print("\n[!] Interrupted by user") # Signal stop stop_event.set() print(f"[{self._timestamp()}] Stopping players...") # Wait for all players to finish (with timeout) await asyncio.wait(tasks, timeout=10) # Drain remaining results await asyncio.sleep(0.5) while not results_queue.empty(): try: result = results_queue.get_nowait() self.results.append(result) except asyncio.QueueEmpty: break self.end_time = datetime.now(timezone.utc) # Collect player stats for player in players: self.player_stats[player.player_id] = player.stats # Generate report report = self._generate_report() self._print_report(report) self._save_report(report) return report async def _collect_results(self, queue: asyncio.Queue, stop_event: asyncio.Event, num_players: int): """Background task to collect action results.""" while not stop_event.is_set() or not queue.empty(): try: result = await asyncio.wait_for(queue.get(), timeout=0.5) self.results.append(result) # Progress indicator every 50 actions total = len(self.results) if total % 50 == 0: elapsed = (datetime.now(timezone.utc) - self.start_time).total_seconds() rate = total / elapsed if elapsed > 0 else 0 print(f" [{self._timestamp()}] {total} actions completed " f"({rate:.1f} actions/sec)") except asyncio.TimeoutError: continue except Exception: continue def _generate_report(self) -> StressTestReport: """Aggregate all results into a final report.""" report = StressTestReport() report.start_time = self.start_time.isoformat() if self.start_time else "" report.end_time = self.end_time.isoformat() if self.end_time else "" report.duration_seconds = ( (self.end_time - self.start_time).total_seconds() if self.start_time and self.end_time else 0 ) report.host = self.host report.port = self.port report.num_players = self.num_players report.target_actions_per_second = self.actions_per_second # Aggregate actions all_latencies = [] action_counts = defaultdict(int) action_latencies = defaultdict(list) error_counts = defaultdict(int) for r in self.results: action_counts[r.action] += 1 if r.success: all_latencies.append(r.latency_ms) action_latencies[r.action].append(r.latency_ms) else: report.failed_actions += 1 if r.error: error_counts[r.error] += 1 report.total_actions = len(self.results) report.successful_actions = report.total_actions - report.failed_actions if report.total_actions > 0: report.error_rate_percent = (report.failed_actions / report.total_actions) * 100 if report.duration_seconds > 0: report.throughput_actions_per_sec = report.total_actions / report.duration_seconds # Latency percentiles if all_latencies: sorted_lat = sorted(all_latencies) report.latency_min_ms = sorted_lat[0] report.latency_max_ms = sorted_lat[-1] report.latency_mean_ms = statistics.mean(sorted_lat) report.latency_median_ms = statistics.median(sorted_lat) report.latency_p90_ms = sorted_lat[int(len(sorted_lat) * 0.90)] report.latency_p95_ms = sorted_lat[int(len(sorted_lat) * 0.95)] report.latency_p99_ms = sorted_lat[int(len(sorted_lat) * 0.99)] # Action breakdown for action, count in sorted(action_counts.items(), key=lambda x: -x[1]): lats = action_latencies.get(action, []) report.action_breakdown[action] = { "count": count, "avg_latency_ms": round(statistics.mean(lats), 2) if lats else 0, "success_rate": round( (len(lats) / count * 100) if count > 0 else 0, 1 ) } # Connection stats connect_times = [] for ps in self.player_stats.values(): if ps.connected: report.connections_succeeded += 1 connect_times.append(ps.connect_time_ms) else: report.connections_failed += 1 if connect_times: report.avg_connect_time_ms = statistics.mean(connect_times) # Top errors report.top_errors = [ {"error": err, "count": count} for err, count in sorted(error_counts.items(), key=lambda x: -x[1])[:10] ] # Player summaries for pid, ps in sorted(self.player_stats.items()): report.player_summaries.append({ "player_id": pid, "connected": ps.connected, "actions_completed": ps.actions_completed, "actions_failed": ps.actions_failed, "avg_latency_ms": round(statistics.mean(ps.latencies), 2) if ps.latencies else 0, "error_count": len(ps.errors), }) return report def _print_report(self, report: StressTestReport): """Print formatted report to console.""" print(f"\n{'='*60}") print(f" STRESS TEST REPORT - Fenrir Protocol") print(f"{'='*60}") print(f"\n --- Test Parameters ---") print(f" Start: {report.start_time}") print(f" End: {report.end_time}") print(f" Duration: {report.duration_seconds:.1f}s") print(f" Target: {report.host}:{report.port}") print(f" Players: {report.num_players}") print(f" Rate/Player:{report.target_actions_per_second} actions/sec") print(f"\n --- Throughput ---") print(f" Total Actions: {report.total_actions}") print(f" Successful: {report.successful_actions}") print(f" Failed: {report.failed_actions}") print(f" Error Rate: {report.error_rate_percent:.2f}%") print(f" Throughput: {report.throughput_actions_per_sec:.2f} actions/sec") print(f"\n --- Latency (ms) ---") print(f" Min: {report.latency_min_ms:.1f}") print(f" Mean: {report.latency_mean_ms:.1f}") print(f" Median: {report.latency_median_ms:.1f}") print(f" P90: {report.latency_p90_ms:.1f}") print(f" P95: {report.latency_p95_ms:.1f}") print(f" P99: {report.latency_p99_ms:.1f}") print(f" Max: {report.latency_max_ms:.1f}") print(f"\n --- Connections ---") print(f" Succeeded: {report.connections_succeeded}") print(f" Failed: {report.connections_failed}") print(f" Avg Time: {report.avg_connect_time_ms:.1f}ms") print(f"\n --- Action Breakdown ---") print(f" {'Action':<20} {'Count':>8} {'Avg(ms)':>10} {'Success%':>10}") print(f" {'-'*48}") for action, info in report.action_breakdown.items(): print(f" {action:<20} {info['count']:>8} " f"{info['avg_latency_ms']:>10.1f} {info['success_rate']:>9.1f}%") if report.top_errors: print(f"\n --- Top Errors ---") for err_info in report.top_errors[:5]: err_msg = err_info['error'][:50] print(f" [{err_info['count']}x] {err_msg}") # Player summary (compact) print(f"\n --- Player Summary (top 10 by actions) ---") sorted_players = sorted( report.player_summaries, key=lambda p: p['actions_completed'], reverse=True )[:10] print(f" {'Player':<12} {'Done':>6} {'Fail':>6} {'Avg(ms)':>10} {'Status':<10}") print(f" {'-'*48}") for ps in sorted_players: status = "OK" if ps['connected'] else "FAILED" print(f" #{ps['player_id']:<11} {ps['actions_completed']:>6} " f"{ps['actions_failed']:>6} {ps['avg_latency_ms']:>10.1f} {status}") print(f"\n{'='*60}") print(f" Verdict: ", end="") if report.error_rate_percent < 1 and report.latency_p95_ms < 1000: print("PASSED - System handles load well") elif report.error_rate_percent < 5 and report.latency_p95_ms < 3000: print("WARNING - Acceptable but room for improvement") else: print("NEEDS ATTENTION - High error rate or latency") print(f"{'='*60}\n") def _save_report(self, report: StressTestReport): """Save report to JSON file.""" report_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "tests") os.makedirs(report_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = os.path.join(report_dir, f"stress_report_{timestamp}.json") # Convert to dict for JSON serialization report_dict = { "test_name": "Fenrir Stress Test", "start_time": report.start_time, "end_time": report.end_time, "duration_seconds": report.duration_seconds, "target": {"host": report.host, "port": report.port}, "parameters": { "num_players": report.num_players, "target_actions_per_second": report.target_actions_per_second, }, "results": { "total_actions": report.total_actions, "successful_actions": report.successful_actions, "failed_actions": report.failed_actions, "error_rate_percent": round(report.error_rate_percent, 2), "throughput_actions_per_sec": round(report.throughput_actions_per_sec, 2), }, "latency_ms": { "min": round(report.latency_min_ms, 2), "mean": round(report.latency_mean_ms, 2), "median": round(report.latency_median_ms, 2), "p90": round(report.latency_p90_ms, 2), "p95": round(report.latency_p95_ms, 2), "p99": round(report.latency_p99_ms, 2), "max": round(report.latency_max_ms, 2), }, "connections": { "succeeded": report.connections_succeeded, "failed": report.connections_failed, "avg_connect_time_ms": round(report.avg_connect_time_ms, 2), }, "action_breakdown": report.action_breakdown, "top_errors": report.top_errors, "player_summaries": report.player_summaries, } with open(filename, "w") as f: json.dump(report_dict, f, indent=2) print(f" Report saved: {filename}") @staticmethod def _timestamp() -> str: return datetime.now().strftime("%H:%M:%S") # ============================================================================= # Self-Test (no server required) # ============================================================================= def run_self_test(): """ Run a lightweight self-test that validates the stress test logic without requiring a running MUD server. """ print(f"\n{'='*60}") print(f" SELF-TEST MODE - Validation Suite") print(f"{'='*60}\n") passed = 0 failed = 0 def check(name, condition, detail=""): nonlocal passed, failed if condition: print(f" [PASS] {name}") passed += 1 else: print(f" [FAIL] {name} - {detail}") failed += 1 # Test 1: Weighted actions list is populated check("Weighted actions list not empty", len(WEIGHTED_ACTIONS) > 0) check("Weighted actions has correct items", "look" in WEIGHTED_ACTIONS and "north" in WEIGHTED_ACTIONS) # Test 2: ActionResult creation result = ActionResult(player_id=1, action="look", latency_ms=42.5, success=True) check("ActionResult dataclass works", result.player_id == 1 and result.success) check("ActionResult has timestamp", result.timestamp > 0) # Test 3: PlayerStats creation stats = PlayerStats(player_id=1) check("PlayerStats dataclass works", stats.player_id == 1 and stats.actions_completed == 0) # Test 4: StressTestReport creation report = StressTestReport() check("StressTestReport dataclass works", report.total_actions == 0) # Test 5: Action distribution is reasonable action_freq = defaultdict(int) for a in WEIGHTED_ACTIONS: action_freq[a] += 1 check("Multiple action types present", len(action_freq) >= 10) check("'look' is most common action", action_freq["look"] > action_freq["@academy"]) # Test 6: Report generation with mock data runner = StressTestRunner("localhost", 4000, 5, 10, 1.0) runner.start_time = datetime.now(timezone.utc) runner.end_time = datetime.now(timezone.utc) # Add mock results for i in range(100): runner.results.append(ActionResult( player_id=i % 5, action=random.choice(WEIGHTED_ACTIONS), latency_ms=random.uniform(10, 500), success=random.random() > 0.05 )) # Add mock player stats for i in range(5): runner.player_stats[i] = PlayerStats( player_id=i, actions_completed=18, actions_failed=2, connected=True, connect_time_ms=random.uniform(50, 200), latencies=[random.uniform(10, 500) for _ in range(18)] ) report = runner._generate_report() check("Report total_actions correct", report.total_actions == 100) check("Report has latency stats", report.latency_mean_ms > 0) check("Report has action breakdown", len(report.action_breakdown) > 0) check("Report throughput calculated", report.throughput_actions_per_sec > 0) check("Report connection stats", report.connections_succeeded == 5) # Test 7: JSON serialization try: report_dict = { "total_actions": report.total_actions, "latency_ms": { "mean": round(report.latency_mean_ms, 2), "p95": round(report.latency_p95_ms, 2), }, "action_breakdown": report.action_breakdown, } json_str = json.dumps(report_dict) check("Report JSON serializable", len(json_str) > 10) except Exception as e: check("Report JSON serializable", False, str(e)) # Summary total = passed + failed print(f"\n Results: {passed}/{total} passed, {failed} failed") print(f"{'='*60}\n") return failed == 0 # ============================================================================= # Main Entry Point # ============================================================================= def main(): parser = argparse.ArgumentParser( description="Timmy Academy - Fenrir Stress Test Protocol", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # default: 10 players, 30s, 2 act/s %(prog)s --players 50 --duration 120 # heavy load test %(prog)s --host 167.99.126.228 --port 4000 # test live server %(prog)s --self-test # validate without server """ ) parser.add_argument("--players", type=int, default=DEFAULT_PLAYERS, help=f"Number of concurrent virtual players (default: {DEFAULT_PLAYERS})") parser.add_argument("--duration", type=float, default=DEFAULT_DURATION, help=f"Test duration in seconds (default: {DEFAULT_DURATION})") parser.add_argument("--actions-per-second", type=float, default=DEFAULT_ACTIONS_PER_SECOND, help=f"Actions per second per player (default: {DEFAULT_ACTIONS_PER_SECOND})") parser.add_argument("--host", type=str, default=DEFAULT_HOST, help=f"MUD server host (default: {DEFAULT_HOST})") parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"MUD server telnet port (default: {DEFAULT_PORT})") parser.add_argument("--self-test", action="store_true", help="Run self-test validation (no server required)") parser.add_argument("--json", action="store_true", help="Output report as JSON to stdout") args = parser.parse_args() if args.self_test: success = run_self_test() sys.exit(0 if success else 1) # Run the stress test runner = StressTestRunner( host=args.host, port=args.port, num_players=args.players, duration=args.duration, actions_per_second=args.actions_per_second, ) try: report = asyncio.run(runner.run()) if args.json: # Re-output as JSON print(json.dumps({ "total_actions": report.total_actions, "throughput": report.throughput_actions_per_sec, "error_rate": report.error_rate_percent, "latency_p95": report.latency_p95_ms, }, indent=2)) except KeyboardInterrupt: print("\n[!] Test interrupted") sys.exit(130) if __name__ == "__main__": main()