9 Commits

Author SHA1 Message Date
3afdec9019 Merge PR #21
Merged PR #21: security: add .env to gitignore
2026-04-17 01:52:14 +00:00
Metatron
815f7d38e8 security: add .env to gitignore, create .env.example (#17)
hermes-agent/.env contained API credentials committed to repo.

Fix:
- Add .env to .gitignore (prevent future commits)
- Create .env.example with placeholders
- NOTE: Exposed credentials need immediate rotation
2026-04-15 21:56:18 -04:00
0aa6699356 Merge PR #20: fix: Replace hardcoded path with dynamic derivatio 2026-04-15 06:17:27 +00:00
37cecdf95a fix: Replace hardcoded path with dynamic derivation (closes #18) 2026-04-15 03:45:02 +00:00
395c9f7a66 Merge pull request 'Add @who command - show connected players' (#7) from burn/20260413-0410-who-command into master 2026-04-13 08:14:54 +00:00
Alexander Whitestone
d36660e9eb Add @who command - show connected players with location and idle time 2026-04-13 04:13:03 -04:00
67cc7240b7 [auto-merge] stress test
Auto-merged by PR review bot: stress test
2026-04-10 11:44:34 +00:00
Alexander Whitestone
2329b3df57 feat: Add Fenrir stress test - automated player simulation
Implements issue #5: [Fenrir] Stress Test the Academy

Automated stress test tool that simulates multiple concurrent players
connecting to the MUD via telnet and performing random actions.

Features:
- Configurable concurrent players (--players, default 10)
- Configurable test duration (--duration, default 30s)
- Configurable actions per second per player (--actions-per-second, default 2)
- 14 weighted player actions: look, movement, examine, status,
  map, academy, rooms, smell, listen, say
- Response time measurement with latency percentiles (p50/p90/p95/p99)
- Error rate tracking and top error reporting
- Throughput calculation (actions/second)
- Connection success/failure tracking
- Per-player statistics
- JSON report generation with timestamps
- Self-test mode (--self-test) for validation without server
- No external dependencies (stdlib only)

Usage:
  python tests/stress_test.py --players 25 --duration 60
  python tests/stress_test.py --host 167.99.126.228 --port 4000
  python tests/stress_test.py --self-test
2026-04-10 07:21:19 -04:00
5f2c4b066d Merge pull request 'build: second pass — rich descriptions, custom commands, README' (#2) from build/second-pass into master 2026-04-04 02:00:19 +00:00
6 changed files with 921 additions and 3 deletions

5
.gitignore vendored
View File

@@ -54,3 +54,8 @@ nosetests.xml
# VSCode config # VSCode config
.vscode .vscode
# Environment variables — never commit secrets
.env
*.env
!.env.example

View File

@@ -9,6 +9,7 @@ Includes:
- CmdAcademy: Show overview of all 4 wings - CmdAcademy: Show overview of all 4 wings
- CmdSmell: Use atmosphere data for scents - CmdSmell: Use atmosphere data for scents
- CmdListen: Use atmosphere data for sounds - CmdListen: Use atmosphere data for sounds
- CmdWho: Show who is currently online
""" """
from evennia.commands.command import Command as BaseCommand from evennia.commands.command import Command as BaseCommand
@@ -365,3 +366,71 @@ class CmdListen(BaseCommand):
self.caller.msg(f"\n The mood here is: {atmo['mood']}") self.caller.msg(f"\n The mood here is: {atmo['mood']}")
else: else:
self.caller.msg("You listen carefully but hear nothing unusual.") self.caller.msg("You listen carefully but hear nothing unusual.")
class CmdWho(BaseCommand):
"""
Show who is currently online at the Academy.
Usage:
@who
who
Displays connected players, their locations, and session info.
"""
key = "@who"
aliases = ["who"]
locks = "cmd:all()"
help_category = "Academy"
def func(self):
from evennia.accounts.models import AccountDB
import time
online = AccountDB.objects.filter(db_is_connected=True).order_by('db_key')
count = online.count()
msg = []
msg.append("|c" + "=" * 44 + "|n")
msg.append(f"|c TIMMY ACADEMY - Who is Online ({count})|n")
msg.append("|c" + "=" * 44 + "|n")
if count == 0:
msg.append("\n The Academy halls are empty.")
else:
for account in online:
# Get the character this account is puppeting
char = None
try:
sessions = account.sessions.all()
for sess in sessions:
puppet = sess.puppet
if puppet:
char = puppet
break
except Exception:
pass
name = account.db_key
location = char.location.key if char and char.location else "(nowhere)"
idle = ""
try:
last_cmd = account.db_last_cmd_timestamp
if last_cmd:
idle_secs = time.time() - last_cmd
if idle_secs < 60:
idle = "active"
elif idle_secs < 300:
idle = f"{int(idle_secs // 60)}m idle"
else:
idle = f"{int(idle_secs // 60)}m idle"
except Exception:
idle = "?"
msg.append(f"\n |w{name}|n")
msg.append(f" at |c{location}|n")
if idle:
msg.append(f" [{idle}]")
msg.append("\n|c" + "=" * 44 + "|n")
self.caller.msg("\n".join(msg))

View File

@@ -18,7 +18,7 @@ from evennia import default_cmds
from commands.command import ( from commands.command import (
CmdExamine, CmdRooms, CmdExamine, CmdRooms,
CmdStatus, CmdMap, CmdAcademy, CmdStatus, CmdMap, CmdAcademy,
CmdSmell, CmdListen, CmdSmell, CmdListen, CmdWho,
) )
@@ -45,6 +45,7 @@ class CharacterCmdSet(default_cmds.CharacterCmdSet):
self.add(CmdAcademy) self.add(CmdAcademy)
self.add(CmdSmell) self.add(CmdSmell)
self.add(CmdListen) self.add(CmdListen)
self.add(CmdWho)
class AccountCmdSet(default_cmds.AccountCmdSet): class AccountCmdSet(default_cmds.AccountCmdSet):

15
hermes-agent/.env.example Normal file
View File

@@ -0,0 +1,15 @@
# hermes-agent/.env.example
# Copy to .env and fill in real values. NEVER commit .env to git.
# Ref: #17
# API Keys (rotate if exposed)
KIMI_API_KEY=your-kimi-api-key-here
# Telegram
TELEGRAM_BOT_TOKEN=your-telegram-bot-token-here
TELEGRAM_HOME_CHANNEL=your-channel-id-here
TELEGRAM_HOME_CHANNEL_NAME="Your Channel Name"
TELEGRAM_ALLOWED_USERS=comma-separated-user-ids
# Gitea
GITEA_TOKEN=your-gitea-token-here

828
tests/stress_test.py Normal file
View File

@@ -0,0 +1,828 @@
#!/usr/bin/env python3
"""
Timmy Academy - Automated Stress Test (Fenrir Protocol)
Simulates multiple concurrent players connecting to the Timmy Academy MUD,
performing random actions, and measuring system performance under load.
Usage:
python tests/stress_test.py [--players N] [--duration SECS] [--actions-per-second N] [--host HOST] [--port PORT]
Examples:
python tests/stress_test.py # defaults: 10 players, 30s, 2 actions/sec
python tests/stress_test.py --players 25 --duration 60 --actions-per-second 5
python tests/stress_test.py --host 167.99.126.228 --port 4000 --players 50
Requirements:
Python 3.8+ (stdlib only, no external dependencies)
"""
import argparse
import asyncio
import json
import os
import random
import statistics
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
# =============================================================================
# Configuration
# =============================================================================
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 4000
DEFAULT_PLAYERS = 10
DEFAULT_DURATION = 30 # seconds
DEFAULT_ACTIONS_PER_SECOND = 2.0
TELNET_TIMEOUT = 10 # seconds
# Actions a virtual player can perform, with relative weights
PLAYER_ACTIONS = [
("look", 20), # Look at current room
("north", 8), # Move north
("south", 8), # Move south
("east", 8), # Move east
("west", 8), # Move west
("up", 4), # Move up
("down", 4), # Move down
("examine", 10), # Examine room or object
("@status", 6), # Check agent status
("@map", 5), # View map
("@academy", 3), # Academy overview
("rooms", 3), # List rooms
("smell", 5), # Smell the room
("listen", 5), # Listen to the room
("say Hello everyone!", 3), # Say something
]
# Build weighted action list
WEIGHTED_ACTIONS = []
for action, weight in PLAYER_ACTIONS:
WEIGHTED_ACTIONS.extend([action] * weight)
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class ActionResult:
"""Result of a single action execution."""
player_id: int
action: str
latency_ms: float
success: bool
error: Optional[str] = None
timestamp: float = field(default_factory=time.time)
@dataclass
class PlayerStats:
"""Accumulated stats for a single virtual player."""
player_id: int
actions_completed: int = 0
actions_failed: int = 0
errors: list = field(default_factory=list)
latencies: list = field(default_factory=list)
connected: bool = False
connect_time_ms: float = 0.0
@dataclass
class StressTestReport:
"""Final aggregated report from the stress test."""
start_time: str = ""
end_time: str = ""
duration_seconds: float = 0.0
host: str = ""
port: int = 0
num_players: int = 0
target_actions_per_second: float = 0.0
total_actions: int = 0
successful_actions: int = 0
failed_actions: int = 0
error_rate_percent: float = 0.0
throughput_actions_per_sec: float = 0.0
latency_min_ms: float = 0.0
latency_max_ms: float = 0.0
latency_mean_ms: float = 0.0
latency_median_ms: float = 0.0
latency_p90_ms: float = 0.0
latency_p95_ms: float = 0.0
latency_p99_ms: float = 0.0
connections_succeeded: int = 0
connections_failed: int = 0
avg_connect_time_ms: float = 0.0
action_breakdown: dict = field(default_factory=dict)
top_errors: list = field(default_factory=list)
player_summaries: list = field(default_factory=list)
# =============================================================================
# Telnet Client (minimal, stdlib only)
# =============================================================================
class MudClient:
"""Minimal async telnet client for Evennia MUD interaction."""
def __init__(self, host: str, port: int, player_id: int):
self.host = host
self.port = port
self.player_id = player_id
self.reader: Optional[asyncio.StreamReader] = None
self.writer: Optional[asyncio.StreamWriter] = None
self.connected = False
async def connect(self) -> float:
"""Connect to the MUD. Returns connection time in ms."""
start = time.time()
try:
self.reader, self.writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port),
timeout=TELNET_TIMEOUT
)
# Read initial banner/login prompt
await asyncio.wait_for(self._read_until_prompt(), timeout=TELNET_TIMEOUT)
self.connected = True
return (time.time() - start) * 1000
except Exception as e:
self.connected = False
raise ConnectionError(f"Player {self.player_id}: Failed to connect: {e}")
async def disconnect(self):
"""Gracefully disconnect."""
self.connected = False
if self.writer:
try:
self.writer.close()
await self.writer.wait_closed()
except Exception:
pass
async def send_command(self, command: str) -> tuple[float, str]:
"""
Send a command and wait for response.
Returns (latency_ms, response_text).
"""
if not self.connected or not self.writer:
raise ConnectionError("Not connected")
start = time.time()
try:
# Send command with newline
self.writer.write(f"{command}\r\n".encode("utf-8"))
await self.writer.drain()
# Read response until we see a prompt character
response = await asyncio.wait_for(
self._read_until_prompt(),
timeout=TELNET_TIMEOUT
)
latency = (time.time() - start) * 1000
return latency, response
except asyncio.TimeoutError:
latency = (time.time() - start) * 1000
raise TimeoutError(f"Timeout after {latency:.0f}ms waiting for response to '{command}'")
except Exception as e:
latency = (time.time() - start) * 1000
raise RuntimeError(f"Error after {latency:.0f}ms: {e}")
async def _read_until_prompt(self, max_bytes: int = 8192) -> str:
"""Read data until we see a prompt indicator or buffer limit."""
buffer = b""
prompt_chars = (b">", b"]", b":") # Common MUD prompt endings
while len(buffer) < max_bytes:
try:
chunk = await asyncio.wait_for(
self.reader.read(1024),
timeout=2.0
)
if not chunk:
break
buffer += chunk
# Check if we've received a complete response
# (ends with prompt char or we have enough data)
if any(buffer.rstrip().endswith(pc) for pc in prompt_chars):
break
if len(buffer) > 512:
# Got enough data, don't wait forever
break
except asyncio.TimeoutError:
# No more data coming
break
except Exception:
break
return buffer.decode("utf-8", errors="replace")
# =============================================================================
# Virtual Player
# =============================================================================
class VirtualPlayer:
"""Simulates a single player performing random actions."""
def __init__(self, player_id: int, host: str, port: int,
actions_per_second: float, stop_event: asyncio.Event,
results_queue: asyncio.Queue):
self.player_id = player_id
self.host = host
self.port = port
self.actions_per_second = actions_per_second
self.stop_event = stop_event
self.results_queue = results_queue
self.stats = PlayerStats(player_id=player_id)
self.client = MudClient(host, port, player_id)
self.action_count = 0
async def run(self):
"""Main player loop: connect, perform actions, disconnect."""
try:
# Connect
connect_ms = await self.client.connect()
self.stats.connected = True
self.stats.connect_time_ms = connect_ms
# Log in with a unique character name
await self._login()
# Perform actions until stopped
interval = 1.0 / self.actions_per_second
while not self.stop_event.is_set():
action = random.choice(WEIGHTED_ACTIONS)
await self._perform_action(action)
# Jitter the interval +/- 30%
jitter = interval * random.uniform(0.7, 1.3)
try:
await asyncio.wait_for(
self.stop_event.wait(),
timeout=jitter
)
break # Stop event was set
except asyncio.TimeoutError:
pass # Timeout is expected, continue loop
except ConnectionError as e:
self.stats.errors.append(str(e))
await self.results_queue.put(ActionResult(
player_id=self.player_id,
action="connect",
latency_ms=0,
success=False,
error=str(e)
))
except Exception as e:
self.stats.errors.append(f"Unexpected: {e}")
finally:
await self.client.disconnect()
async def _login(self):
"""Handle Evennia login flow."""
# Send character name to connect/create
name = f"StressBot{self.player_id:03d}"
try:
# Evennia login: send name, then handle the response
latency, response = await self.client.send_command(name)
# If asked for password, send a simple one
if "password" in response.lower() or "create" in response.lower():
await self.client.send_command("stress123")
# Wait for game prompt
await asyncio.sleep(0.5)
except Exception:
# Login might fail if account doesn't exist, that's ok
# The player will still be in the login flow and can issue commands
pass
async def _perform_action(self, action: str):
"""Execute a single action and record results."""
self.action_count += 1
result = ActionResult(
player_id=self.player_id,
action=action,
latency_ms=0,
success=False
)
try:
latency, response = await self.client.send_command(action)
result.latency_ms = latency
result.success = True
self.stats.actions_completed += 1
self.stats.latencies.append(latency)
except Exception as e:
result.success = False
result.error = str(e)
result.latency_ms = getattr(e, 'latency_ms', 0) if hasattr(e, 'latency_ms') else 0
self.stats.actions_failed += 1
self.stats.errors.append(str(e))
await self.results_queue.put(result)
# =============================================================================
# Test Runner
# =============================================================================
class StressTestRunner:
"""Orchestrates the full stress test."""
def __init__(self, host: str, port: int, num_players: int,
duration: float, actions_per_second: float):
self.host = host
self.port = port
self.num_players = num_players
self.duration = duration
self.actions_per_second = actions_per_second
self.results: list[ActionResult] = []
self.player_stats: dict[int, PlayerStats] = {}
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
async def run(self) -> StressTestReport:
"""Execute the full stress test and return report."""
print(f"\n{'='*60}")
print(f" TIMMY ACADEMY - Fenrir Stress Test Protocol")
print(f"{'='*60}")
print(f" Target: {self.host}:{self.port}")
print(f" Players: {self.num_players}")
print(f" Duration: {self.duration}s")
print(f" Rate: {self.actions_per_second} actions/sec/player")
print(f" Expected: ~{int(self.num_players * self.actions_per_second * self.duration)} total actions")
print(f"{'='*60}\n")
self.start_time = datetime.now(timezone.utc)
stop_event = asyncio.Event()
results_queue = asyncio.Queue()
# Create virtual players
players = [
VirtualPlayer(
player_id=i,
host=self.host,
port=self.port,
actions_per_second=self.actions_per_second,
stop_event=stop_event,
results_queue=results_queue
)
for i in range(self.num_players)
]
# Start all players concurrently
print(f"[{self._timestamp()}] Launching {self.num_players} virtual players...")
tasks = [asyncio.create_task(player.run()) for player in players]
# Collect results while players run
collector_task = asyncio.create_task(
self._collect_results(results_queue, stop_event, len(players))
)
# Wait for duration
print(f"[{self._timestamp()}] Running for {self.duration} seconds...")
try:
await asyncio.sleep(self.duration)
except KeyboardInterrupt:
print("\n[!] Interrupted by user")
# Signal stop
stop_event.set()
print(f"[{self._timestamp()}] Stopping players...")
# Wait for all players to finish (with timeout)
await asyncio.wait(tasks, timeout=10)
# Drain remaining results
await asyncio.sleep(0.5)
while not results_queue.empty():
try:
result = results_queue.get_nowait()
self.results.append(result)
except asyncio.QueueEmpty:
break
self.end_time = datetime.now(timezone.utc)
# Collect player stats
for player in players:
self.player_stats[player.player_id] = player.stats
# Generate report
report = self._generate_report()
self._print_report(report)
self._save_report(report)
return report
async def _collect_results(self, queue: asyncio.Queue,
stop_event: asyncio.Event,
num_players: int):
"""Background task to collect action results."""
while not stop_event.is_set() or not queue.empty():
try:
result = await asyncio.wait_for(queue.get(), timeout=0.5)
self.results.append(result)
# Progress indicator every 50 actions
total = len(self.results)
if total % 50 == 0:
elapsed = (datetime.now(timezone.utc) - self.start_time).total_seconds()
rate = total / elapsed if elapsed > 0 else 0
print(f" [{self._timestamp()}] {total} actions completed "
f"({rate:.1f} actions/sec)")
except asyncio.TimeoutError:
continue
except Exception:
continue
def _generate_report(self) -> StressTestReport:
"""Aggregate all results into a final report."""
report = StressTestReport()
report.start_time = self.start_time.isoformat() if self.start_time else ""
report.end_time = self.end_time.isoformat() if self.end_time else ""
report.duration_seconds = (
(self.end_time - self.start_time).total_seconds()
if self.start_time and self.end_time else 0
)
report.host = self.host
report.port = self.port
report.num_players = self.num_players
report.target_actions_per_second = self.actions_per_second
# Aggregate actions
all_latencies = []
action_counts = defaultdict(int)
action_latencies = defaultdict(list)
error_counts = defaultdict(int)
for r in self.results:
action_counts[r.action] += 1
if r.success:
all_latencies.append(r.latency_ms)
action_latencies[r.action].append(r.latency_ms)
else:
report.failed_actions += 1
if r.error:
error_counts[r.error] += 1
report.total_actions = len(self.results)
report.successful_actions = report.total_actions - report.failed_actions
if report.total_actions > 0:
report.error_rate_percent = (report.failed_actions / report.total_actions) * 100
if report.duration_seconds > 0:
report.throughput_actions_per_sec = report.total_actions / report.duration_seconds
# Latency percentiles
if all_latencies:
sorted_lat = sorted(all_latencies)
report.latency_min_ms = sorted_lat[0]
report.latency_max_ms = sorted_lat[-1]
report.latency_mean_ms = statistics.mean(sorted_lat)
report.latency_median_ms = statistics.median(sorted_lat)
report.latency_p90_ms = sorted_lat[int(len(sorted_lat) * 0.90)]
report.latency_p95_ms = sorted_lat[int(len(sorted_lat) * 0.95)]
report.latency_p99_ms = sorted_lat[int(len(sorted_lat) * 0.99)]
# Action breakdown
for action, count in sorted(action_counts.items(), key=lambda x: -x[1]):
lats = action_latencies.get(action, [])
report.action_breakdown[action] = {
"count": count,
"avg_latency_ms": round(statistics.mean(lats), 2) if lats else 0,
"success_rate": round(
(len(lats) / count * 100) if count > 0 else 0, 1
)
}
# Connection stats
connect_times = []
for ps in self.player_stats.values():
if ps.connected:
report.connections_succeeded += 1
connect_times.append(ps.connect_time_ms)
else:
report.connections_failed += 1
if connect_times:
report.avg_connect_time_ms = statistics.mean(connect_times)
# Top errors
report.top_errors = [
{"error": err, "count": count}
for err, count in sorted(error_counts.items(), key=lambda x: -x[1])[:10]
]
# Player summaries
for pid, ps in sorted(self.player_stats.items()):
report.player_summaries.append({
"player_id": pid,
"connected": ps.connected,
"actions_completed": ps.actions_completed,
"actions_failed": ps.actions_failed,
"avg_latency_ms": round(statistics.mean(ps.latencies), 2) if ps.latencies else 0,
"error_count": len(ps.errors),
})
return report
def _print_report(self, report: StressTestReport):
"""Print formatted report to console."""
print(f"\n{'='*60}")
print(f" STRESS TEST REPORT - Fenrir Protocol")
print(f"{'='*60}")
print(f"\n --- Test Parameters ---")
print(f" Start: {report.start_time}")
print(f" End: {report.end_time}")
print(f" Duration: {report.duration_seconds:.1f}s")
print(f" Target: {report.host}:{report.port}")
print(f" Players: {report.num_players}")
print(f" Rate/Player:{report.target_actions_per_second} actions/sec")
print(f"\n --- Throughput ---")
print(f" Total Actions: {report.total_actions}")
print(f" Successful: {report.successful_actions}")
print(f" Failed: {report.failed_actions}")
print(f" Error Rate: {report.error_rate_percent:.2f}%")
print(f" Throughput: {report.throughput_actions_per_sec:.2f} actions/sec")
print(f"\n --- Latency (ms) ---")
print(f" Min: {report.latency_min_ms:.1f}")
print(f" Mean: {report.latency_mean_ms:.1f}")
print(f" Median: {report.latency_median_ms:.1f}")
print(f" P90: {report.latency_p90_ms:.1f}")
print(f" P95: {report.latency_p95_ms:.1f}")
print(f" P99: {report.latency_p99_ms:.1f}")
print(f" Max: {report.latency_max_ms:.1f}")
print(f"\n --- Connections ---")
print(f" Succeeded: {report.connections_succeeded}")
print(f" Failed: {report.connections_failed}")
print(f" Avg Time: {report.avg_connect_time_ms:.1f}ms")
print(f"\n --- Action Breakdown ---")
print(f" {'Action':<20} {'Count':>8} {'Avg(ms)':>10} {'Success%':>10}")
print(f" {'-'*48}")
for action, info in report.action_breakdown.items():
print(f" {action:<20} {info['count']:>8} "
f"{info['avg_latency_ms']:>10.1f} {info['success_rate']:>9.1f}%")
if report.top_errors:
print(f"\n --- Top Errors ---")
for err_info in report.top_errors[:5]:
err_msg = err_info['error'][:50]
print(f" [{err_info['count']}x] {err_msg}")
# Player summary (compact)
print(f"\n --- Player Summary (top 10 by actions) ---")
sorted_players = sorted(
report.player_summaries,
key=lambda p: p['actions_completed'],
reverse=True
)[:10]
print(f" {'Player':<12} {'Done':>6} {'Fail':>6} {'Avg(ms)':>10} {'Status':<10}")
print(f" {'-'*48}")
for ps in sorted_players:
status = "OK" if ps['connected'] else "FAILED"
print(f" #{ps['player_id']:<11} {ps['actions_completed']:>6} "
f"{ps['actions_failed']:>6} {ps['avg_latency_ms']:>10.1f} {status}")
print(f"\n{'='*60}")
print(f" Verdict: ", end="")
if report.error_rate_percent < 1 and report.latency_p95_ms < 1000:
print("PASSED - System handles load well")
elif report.error_rate_percent < 5 and report.latency_p95_ms < 3000:
print("WARNING - Acceptable but room for improvement")
else:
print("NEEDS ATTENTION - High error rate or latency")
print(f"{'='*60}\n")
def _save_report(self, report: StressTestReport):
"""Save report to JSON file."""
report_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "tests")
os.makedirs(report_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
filename = os.path.join(report_dir, f"stress_report_{timestamp}.json")
# Convert to dict for JSON serialization
report_dict = {
"test_name": "Fenrir Stress Test",
"start_time": report.start_time,
"end_time": report.end_time,
"duration_seconds": report.duration_seconds,
"target": {"host": report.host, "port": report.port},
"parameters": {
"num_players": report.num_players,
"target_actions_per_second": report.target_actions_per_second,
},
"results": {
"total_actions": report.total_actions,
"successful_actions": report.successful_actions,
"failed_actions": report.failed_actions,
"error_rate_percent": round(report.error_rate_percent, 2),
"throughput_actions_per_sec": round(report.throughput_actions_per_sec, 2),
},
"latency_ms": {
"min": round(report.latency_min_ms, 2),
"mean": round(report.latency_mean_ms, 2),
"median": round(report.latency_median_ms, 2),
"p90": round(report.latency_p90_ms, 2),
"p95": round(report.latency_p95_ms, 2),
"p99": round(report.latency_p99_ms, 2),
"max": round(report.latency_max_ms, 2),
},
"connections": {
"succeeded": report.connections_succeeded,
"failed": report.connections_failed,
"avg_connect_time_ms": round(report.avg_connect_time_ms, 2),
},
"action_breakdown": report.action_breakdown,
"top_errors": report.top_errors,
"player_summaries": report.player_summaries,
}
with open(filename, "w") as f:
json.dump(report_dict, f, indent=2)
print(f" Report saved: {filename}")
@staticmethod
def _timestamp() -> str:
return datetime.now().strftime("%H:%M:%S")
# =============================================================================
# Self-Test (no server required)
# =============================================================================
def run_self_test():
"""
Run a lightweight self-test that validates the stress test logic
without requiring a running MUD server.
"""
print(f"\n{'='*60}")
print(f" SELF-TEST MODE - Validation Suite")
print(f"{'='*60}\n")
passed = 0
failed = 0
def check(name, condition, detail=""):
nonlocal passed, failed
if condition:
print(f" [PASS] {name}")
passed += 1
else:
print(f" [FAIL] {name} - {detail}")
failed += 1
# Test 1: Weighted actions list is populated
check("Weighted actions list not empty", len(WEIGHTED_ACTIONS) > 0)
check("Weighted actions has correct items",
"look" in WEIGHTED_ACTIONS and "north" in WEIGHTED_ACTIONS)
# Test 2: ActionResult creation
result = ActionResult(player_id=1, action="look", latency_ms=42.5, success=True)
check("ActionResult dataclass works", result.player_id == 1 and result.success)
check("ActionResult has timestamp", result.timestamp > 0)
# Test 3: PlayerStats creation
stats = PlayerStats(player_id=1)
check("PlayerStats dataclass works", stats.player_id == 1 and stats.actions_completed == 0)
# Test 4: StressTestReport creation
report = StressTestReport()
check("StressTestReport dataclass works", report.total_actions == 0)
# Test 5: Action distribution is reasonable
action_freq = defaultdict(int)
for a in WEIGHTED_ACTIONS:
action_freq[a] += 1
check("Multiple action types present", len(action_freq) >= 10)
check("'look' is most common action", action_freq["look"] > action_freq["@academy"])
# Test 6: Report generation with mock data
runner = StressTestRunner("localhost", 4000, 5, 10, 1.0)
runner.start_time = datetime.now(timezone.utc)
runner.end_time = datetime.now(timezone.utc)
# Add mock results
for i in range(100):
runner.results.append(ActionResult(
player_id=i % 5,
action=random.choice(WEIGHTED_ACTIONS),
latency_ms=random.uniform(10, 500),
success=random.random() > 0.05
))
# Add mock player stats
for i in range(5):
runner.player_stats[i] = PlayerStats(
player_id=i,
actions_completed=18,
actions_failed=2,
connected=True,
connect_time_ms=random.uniform(50, 200),
latencies=[random.uniform(10, 500) for _ in range(18)]
)
report = runner._generate_report()
check("Report total_actions correct", report.total_actions == 100)
check("Report has latency stats", report.latency_mean_ms > 0)
check("Report has action breakdown", len(report.action_breakdown) > 0)
check("Report throughput calculated", report.throughput_actions_per_sec > 0)
check("Report connection stats", report.connections_succeeded == 5)
# Test 7: JSON serialization
try:
report_dict = {
"total_actions": report.total_actions,
"latency_ms": {
"mean": round(report.latency_mean_ms, 2),
"p95": round(report.latency_p95_ms, 2),
},
"action_breakdown": report.action_breakdown,
}
json_str = json.dumps(report_dict)
check("Report JSON serializable", len(json_str) > 10)
except Exception as e:
check("Report JSON serializable", False, str(e))
# Summary
total = passed + failed
print(f"\n Results: {passed}/{total} passed, {failed} failed")
print(f"{'='*60}\n")
return failed == 0
# =============================================================================
# Main Entry Point
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="Timmy Academy - Fenrir Stress Test Protocol",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # default: 10 players, 30s, 2 act/s
%(prog)s --players 50 --duration 120 # heavy load test
%(prog)s --host 167.99.126.228 --port 4000 # test live server
%(prog)s --self-test # validate without server
"""
)
parser.add_argument("--players", type=int, default=DEFAULT_PLAYERS,
help=f"Number of concurrent virtual players (default: {DEFAULT_PLAYERS})")
parser.add_argument("--duration", type=float, default=DEFAULT_DURATION,
help=f"Test duration in seconds (default: {DEFAULT_DURATION})")
parser.add_argument("--actions-per-second", type=float, default=DEFAULT_ACTIONS_PER_SECOND,
help=f"Actions per second per player (default: {DEFAULT_ACTIONS_PER_SECOND})")
parser.add_argument("--host", type=str, default=DEFAULT_HOST,
help=f"MUD server host (default: {DEFAULT_HOST})")
parser.add_argument("--port", type=int, default=DEFAULT_PORT,
help=f"MUD server telnet port (default: {DEFAULT_PORT})")
parser.add_argument("--self-test", action="store_true",
help="Run self-test validation (no server required)")
parser.add_argument("--json", action="store_true",
help="Output report as JSON to stdout")
args = parser.parse_args()
if args.self_test:
success = run_self_test()
sys.exit(0 if success else 1)
# Run the stress test
runner = StressTestRunner(
host=args.host,
port=args.port,
num_players=args.players,
duration=args.duration,
actions_per_second=args.actions_per_second,
)
try:
report = asyncio.run(runner.run())
if args.json:
# Re-output as JSON
print(json.dumps({
"total_actions": report.total_actions,
"throughput": report.throughput_actions_per_sec,
"error_rate": report.error_rate_percent,
"latency_p95": report.latency_p95_ms,
}, indent=2))
except KeyboardInterrupt:
print("\n[!] Test interrupted")
sys.exit(130)
if __name__ == "__main__":
main()

View File

@@ -9,7 +9,7 @@ and configures the Public channel.
Safe to rerun (idempotent). Safe to rerun (idempotent).
Usage: Usage:
cd /root/workspace/timmy-academy cd /path/to/timmy-academy
source /root/workspace/evennia-venv/bin/activate source /root/workspace/evennia-venv/bin/activate
python world/rebuild_world.py python world/rebuild_world.py
""" """
@@ -19,7 +19,7 @@ import re
import ast import ast
os.environ["DJANGO_SETTINGS_MODULE"] = "server.conf.settings" os.environ["DJANGO_SETTINGS_MODULE"] = "server.conf.settings"
sys.path.insert(0, "/root/workspace/timmy-academy") sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import django import django
django.setup() django.setup()