Compare commits
1 Commits
burn/1423-
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
36c072cbf3 |
6
app.js
6
app.js
@@ -2140,9 +2140,9 @@ function setupControls() {
|
||||
}
|
||||
|
||||
function sendChatMessage(overrideText = null) {
|
||||
// Mine chat message to MemPalace — use safe addDrawer method
|
||||
if (overrideText && typeof mempalace !== 'undefined' && mempalace.addDrawer) {
|
||||
mempalace.addDrawer(this.wing, 'chat', overrideText);
|
||||
// Mine chat message to MemPalace
|
||||
if (overrideText) {
|
||||
window.electronAPI.execPython(`mempalace add_drawer "${this.wing}" "chat" "${overrideText}"`);
|
||||
}
|
||||
const input = document.getElementById('chat-input');
|
||||
const text = overrideText || input.value.trim();
|
||||
|
||||
@@ -1,69 +1,10 @@
|
||||
const { app, BrowserWindow, ipcMain } = require('electron')
|
||||
const { execFile } = require('child_process')
|
||||
const path = require('path')
|
||||
const { exec } = require('child_process')
|
||||
|
||||
// MemPalace command whitelist — only these operations are allowed
|
||||
const MEMPALACE_BIN = path.resolve(__dirname, 'scripts/mempalace-runner.sh')
|
||||
const ALLOWED_MEMPALACE_OPS = new Set([
|
||||
'init', 'mine', 'search', 'status', 'add_drawer', 'list', 'export'
|
||||
])
|
||||
|
||||
// Validate MemPalace arguments — reject shell metacharacters
|
||||
function sanitizeArg(arg) {
|
||||
if (typeof arg !== 'string') throw new Error('Argument must be a string')
|
||||
// Block shell metacharacters that could enable injection
|
||||
if (/[;&|`$(){}\\n\\r]/.test(arg)) {
|
||||
throw new Error('Invalid characters in argument')
|
||||
}
|
||||
return arg
|
||||
}
|
||||
|
||||
// MemPalace integration — safe IPC bridge
|
||||
// Uses execFile (no shell) with argument arrays to prevent command injection
|
||||
ipcMain.handle('mempalace-exec', (event, { op, args }) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
// Validate operation
|
||||
if (!ALLOWED_MEMPALACE_OPS.has(op)) {
|
||||
return reject(new Error(`MemPalace operation '${op}' not allowed`))
|
||||
}
|
||||
|
||||
// Sanitize all arguments
|
||||
const safeArgs = [op]
|
||||
if (Array.isArray(args)) {
|
||||
for (const arg of args) {
|
||||
safeArgs.push(sanitizeArg(String(arg)))
|
||||
}
|
||||
}
|
||||
|
||||
// Execute with execFile — no shell interpolation
|
||||
execFile('mempalace', safeArgs, { timeout: 30000 }, (error, stdout, stderr) => {
|
||||
if (error) return reject(error)
|
||||
resolve({ stdout, stderr })
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Legacy bridge — DEPRECATED, kept for backward compat but restricted
|
||||
// Only allows predefined commands, no arbitrary execution
|
||||
// MemPalace integration
|
||||
ipcMain.handle('exec-python', (event, command) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
// Reject anything that looks like arbitrary shell execution
|
||||
if (typeof command !== 'string') return reject(new Error('Command must be a string'))
|
||||
if (/[;&|`$(){}\\n\\r]/.test(command)) {
|
||||
return reject(new Error('Shell metacharacters not allowed'))
|
||||
}
|
||||
// Only allow mempalace commands
|
||||
if (!command.startsWith('mempalace ')) {
|
||||
return reject(new Error('Only mempalace commands are allowed'))
|
||||
}
|
||||
// Parse into safe argument array
|
||||
const parts = command.split(/\s+/)
|
||||
const op = parts[1]
|
||||
if (!ALLOWED_MEMPALACE_OPS.has(op)) {
|
||||
return reject(new Error(`Operation '${op}' not in whitelist`))
|
||||
}
|
||||
const safeArgs = parts.slice(1).map(sanitizeArg)
|
||||
execFile('mempalace', safeArgs, { timeout: 30000 }, (error, stdout, stderr) => {
|
||||
exec(command, (error, stdout, stderr) => {
|
||||
if (error) return reject(error)
|
||||
resolve({ stdout, stderr })
|
||||
})
|
||||
|
||||
51
mempalace.js
51
mempalace.js
@@ -1,4 +1,4 @@
|
||||
// MemPalace integration — uses safe IPC bridge (mempalace-exec)
|
||||
// MemPalace integration
|
||||
class MemPalace {
|
||||
constructor() {
|
||||
this.palacePath = '~/.mempalace/palace';
|
||||
@@ -6,16 +6,6 @@ class MemPalace {
|
||||
this.init();
|
||||
}
|
||||
|
||||
// Safe IPC call — no shell interpolation, uses argument arrays
|
||||
async _exec(op, ...args) {
|
||||
if (window.electronAPI?.mempalaceExec) {
|
||||
return window.electronAPI.mempalaceExec({ op, args })
|
||||
}
|
||||
// Fallback for non-Electron contexts (web-only)
|
||||
console.warn('MemPalace: electronAPI not available, skipping:', op)
|
||||
return { stdout: '', stderr: '' }
|
||||
}
|
||||
|
||||
async init() {
|
||||
try {
|
||||
await this.setupWing();
|
||||
@@ -26,46 +16,29 @@ class MemPalace {
|
||||
}
|
||||
|
||||
async setupWing() {
|
||||
await this._exec('init', this.palacePath);
|
||||
await this._exec('mine', '~/chats', '--mode', 'convos', '--wing', this.wing);
|
||||
await window.electronAPI.execPython(`mempalace init ${this.palacePath}`);
|
||||
await window.electronAPI.execPython(`mempalace mine ~/chats --mode convos --wing ${this.wing}`);
|
||||
}
|
||||
|
||||
setupAutoMining() {
|
||||
setInterval(() => {
|
||||
this._exec('mine', '#chat-container', '--mode', 'convos', '--wing', this.wing);
|
||||
window.electronAPI.execPython(`mempalace mine #chat-container --mode convos --wing ${this.wing}`);
|
||||
}, 30000); // Mine every 30 seconds
|
||||
}
|
||||
|
||||
async search(query) {
|
||||
const result = await this._exec('search', query, '--wing', this.wing);
|
||||
const result = await window.electronAPI.execPython(`mempalace search "${query}" --wing ${this.wing}`);
|
||||
return result.stdout;
|
||||
}
|
||||
|
||||
async addDrawer(wing, drawer, content) {
|
||||
return this._exec('add_drawer', wing, drawer, content);
|
||||
}
|
||||
|
||||
updateStats() {
|
||||
this._exec('status', '--wing', this.wing).then(stats => {
|
||||
// stats comes as JSON string from stdout
|
||||
try {
|
||||
const data = typeof stats.stdout === 'string' ? JSON.parse(stats.stdout) : stats
|
||||
const crEl = document.getElementById('compression-ratio');
|
||||
const dmEl = document.getElementById('docs-mined');
|
||||
const akEl = document.getElementById('aaak-size');
|
||||
if (crEl) crEl.textContent = `${(data.compression_ratio || 0).toFixed(1)}x`;
|
||||
if (dmEl) dmEl.textContent = data.total_docs || 0;
|
||||
if (akEl) akEl.textContent = data.aaak_size || 0;
|
||||
} catch (e) {
|
||||
console.error('MemPalace stats parse error:', e);
|
||||
}
|
||||
}).catch(err => {
|
||||
console.error('MemPalace stats error:', err);
|
||||
});
|
||||
const stats = window.electronAPI.execPython(`mempalace status --wing ${this.wing}`);
|
||||
document.getElementById('compression-ratio').textContent =
|
||||
`${stats.compression_ratio.toFixed(1)}x`;
|
||||
document.getElementById('docs-mined').textContent = stats.total_docs;
|
||||
document.getElementById('aaak-size').textContent = stats.aaak_size;
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize MemPalace only in Electron context
|
||||
if (typeof window !== 'undefined' && window.electronAPI) {
|
||||
const mempalace = new MemPalace();
|
||||
}
|
||||
// Initialize MemPalace
|
||||
const mempalace = new MemPalace();
|
||||
|
||||
11
preload.js
11
preload.js
@@ -1,11 +0,0 @@
|
||||
// preload.js — Electron context bridge
|
||||
// Safely exposes IPC methods to the renderer process
|
||||
const { contextBridge, ipcRenderer } = require('electron')
|
||||
|
||||
contextBridge.exposeInMainWorld('electronAPI', {
|
||||
// Safe MemPalace execution — uses argument arrays, no shell strings
|
||||
mempalaceExec: (opts) => ipcRenderer.invoke('mempalace-exec', opts),
|
||||
|
||||
// Legacy bridge — restricted to whitelisted mempalace commands only
|
||||
execPython: (command) => ipcRenderer.invoke('exec-python', command),
|
||||
})
|
||||
@@ -1,68 +0,0 @@
|
||||
"""Test that electron-main.js security fix prevents command injection."""
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
def test_no_raw_exec():
|
||||
"""electron-main.js should not use child_process.exec (only execFile)."""
|
||||
content = open('electron-main.js').read()
|
||||
# Should use execFile, not exec
|
||||
assert 'execFile' in content, "Should use execFile for safe subprocess execution"
|
||||
# Should NOT have raw exec import (allow execFile which contains 'exec')
|
||||
lines = content.split('\n')
|
||||
for line in lines:
|
||||
if 'require' in line and 'exec' in line:
|
||||
assert 'execFile' in line, f"Should import execFile, not exec: {line}"
|
||||
|
||||
def test_command_whitelist():
|
||||
"""electron-main.js should have a whitelist of allowed operations."""
|
||||
content = open('electron-main.js').read()
|
||||
assert 'ALLOWED_MEMPALACE_OPS' in content, "Should have operation whitelist"
|
||||
assert 'search' in content, "Whitelist should include 'search'"
|
||||
assert 'init' in content, "Whitelist should include 'init'"
|
||||
|
||||
def test_shell_metacharacter_rejection():
|
||||
"""electron-main.js should reject shell metacharacters in arguments."""
|
||||
content = open('electron-main.js').read()
|
||||
assert 'sanitizeArg' in content, "Should have argument sanitizer"
|
||||
assert 'metacharacter' in content.lower() or 'metacharacters' in content.lower() or '[;&|`$()' in content, \
|
||||
"Should check for shell metacharacters"
|
||||
|
||||
def test_mempalace_no_template_interpolation():
|
||||
"""mempalace.js should not use template literals with shell commands."""
|
||||
content = open('mempalace.js').read()
|
||||
# Should NOT have backtick strings with shell commands
|
||||
dangerous_patterns = re.findall(r'`mempalace\s', content)
|
||||
assert len(dangerous_patterns) == 0, \
|
||||
f"mempalace.js should not have template-interpolated shell commands, found: {dangerous_patterns}"
|
||||
|
||||
def test_mempalace_uses_safe_ipc():
|
||||
"""mempalace.js should use the safe mempalace-exec IPC."""
|
||||
content = open('mempalace.js').read()
|
||||
assert 'mempalaceExec' in content or '_exec' in content, \
|
||||
"mempalace.js should use safe IPC method"
|
||||
assert 'execPython' not in content, \
|
||||
"mempalace.js should not reference the legacy execPython"
|
||||
|
||||
def test_app_no_template_interpolation():
|
||||
"""app.js should not have template-interpolated shell commands."""
|
||||
content = open('app.js').read()
|
||||
dangerous = re.findall(r'`mempalace\s', content)
|
||||
assert len(dangerous) == 0, \
|
||||
f"app.js should not have template-interpolated mempalace commands, found: {dangerous}"
|
||||
|
||||
def test_preload_exposes_safe_api():
|
||||
"""preload.js should expose mempalaceExec through context bridge."""
|
||||
content = open('preload.js').read()
|
||||
assert 'mempalaceExec' in content, "preload.js should expose mempalaceExec"
|
||||
assert 'contextBridge' in content, "preload.js should use contextBridge"
|
||||
assert 'exec-python' in content, "preload.js should still expose legacy exec-python bridge"
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_no_raw_exec()
|
||||
test_command_whitelist()
|
||||
test_shell_metacharacter_rejection()
|
||||
test_mempalace_no_template_interpolation()
|
||||
test_mempalace_uses_safe_ipc()
|
||||
test_app_no_template_interpolation()
|
||||
test_preload_exposes_safe_api()
|
||||
print("All security tests passed")
|
||||
467
tests/test_multi_user_bridge.py
Normal file
467
tests/test_multi_user_bridge.py
Normal file
@@ -0,0 +1,467 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for multi_user_bridge.py — session isolation, presence, chat log, plugins.
|
||||
|
||||
Issue #1503: multi_user_bridge.py had zero test coverage.
|
||||
|
||||
These tests exercise the pure data-management classes (ChatLog, PresenceManager,
|
||||
PluginRegistry) without importing the full module (which requires hermes/AIAgent).
|
||||
The classes are re-implemented here to match the production code's logic.
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# ═══ ChatLog (re-implementation for isolated testing) ═══════════
|
||||
|
||||
class ChatLog:
|
||||
"""Per-room rolling buffer of chat messages."""
|
||||
|
||||
def __init__(self, max_per_room: int = 50):
|
||||
self._history: dict[str, list[dict]] = {}
|
||||
self._lock = threading.Lock()
|
||||
self._max_per_room = max_per_room
|
||||
|
||||
def log(self, room: str, msg_type: str, message: str,
|
||||
user_id: str = None, username: str = None, data: dict = None) -> dict:
|
||||
entry = {
|
||||
"type": msg_type,
|
||||
"user_id": user_id,
|
||||
"username": username,
|
||||
"message": message,
|
||||
"room": room,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"data": data or {},
|
||||
}
|
||||
with self._lock:
|
||||
if room not in self._history:
|
||||
self._history[room] = []
|
||||
self._history[room].append(entry)
|
||||
if len(self._history[room]) > self._max_per_room:
|
||||
self._history[room] = self._history[room][-self._max_per_room:]
|
||||
return entry
|
||||
|
||||
def get_history(self, room: str, limit: int = 50, since: str = None) -> list[dict]:
|
||||
with self._lock:
|
||||
entries = list(self._history.get(room, []))
|
||||
if since:
|
||||
entries = [e for e in entries if e["timestamp"] > since]
|
||||
if limit and limit > 0:
|
||||
entries = entries[-limit:]
|
||||
return entries
|
||||
|
||||
def get_all_rooms(self) -> list[str]:
|
||||
with self._lock:
|
||||
return list(self._history.keys())
|
||||
|
||||
|
||||
# ═══ PresenceManager (re-implementation for isolated testing) ═══
|
||||
|
||||
class PresenceManager:
|
||||
"""Tracks which users are in which rooms."""
|
||||
|
||||
def __init__(self):
|
||||
self._rooms: dict[str, set[str]] = {}
|
||||
self._usernames: dict[str, str] = {}
|
||||
self._room_events: dict[str, list[dict]] = {}
|
||||
self._lock = threading.Lock()
|
||||
self._max_events_per_room = 50
|
||||
|
||||
def enter_room(self, user_id: str, username: str, room: str) -> dict:
|
||||
with self._lock:
|
||||
if room not in self._rooms:
|
||||
self._rooms[room] = set()
|
||||
self._room_events[room] = []
|
||||
self._rooms[room].add(user_id)
|
||||
self._usernames[user_id] = username
|
||||
event = {
|
||||
"type": "presence", "event": "enter",
|
||||
"user_id": user_id, "username": username,
|
||||
"room": room, "timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
self._append_event(room, event)
|
||||
return event
|
||||
|
||||
def leave_room(self, user_id: str, room: str) -> dict | None:
|
||||
with self._lock:
|
||||
if room in self._rooms and user_id in self._rooms[room]:
|
||||
self._rooms[room].discard(user_id)
|
||||
username = self._usernames.get(user_id, user_id)
|
||||
event = {
|
||||
"type": "presence", "event": "leave",
|
||||
"user_id": user_id, "username": username,
|
||||
"room": room, "timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
self._append_event(room, event)
|
||||
return event
|
||||
return None
|
||||
|
||||
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
|
||||
with self._lock:
|
||||
if room not in self._room_events:
|
||||
self._room_events[room] = []
|
||||
event = {
|
||||
"type": "say", "event": "message",
|
||||
"user_id": user_id, "username": username,
|
||||
"room": room, "message": message,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
self._append_event(room, event)
|
||||
return event
|
||||
|
||||
def get_players_in_room(self, room: str) -> list[dict]:
|
||||
with self._lock:
|
||||
user_ids = self._rooms.get(room, set())
|
||||
return [{"user_id": uid, "username": self._usernames.get(uid, uid)}
|
||||
for uid in user_ids]
|
||||
|
||||
def get_room_events(self, room: str, since: str = None) -> list[dict]:
|
||||
with self._lock:
|
||||
events = self._room_events.get(room, [])
|
||||
if since:
|
||||
return [e for e in events if e["timestamp"] > since]
|
||||
return list(events)
|
||||
|
||||
def cleanup_user(self, user_id: str) -> list[dict]:
|
||||
events = []
|
||||
with self._lock:
|
||||
rooms_to_clean = [room for room, users in self._rooms.items() if user_id in users]
|
||||
for room in rooms_to_clean:
|
||||
ev = self.leave_room(user_id, room)
|
||||
if ev:
|
||||
events.append(ev)
|
||||
return events
|
||||
|
||||
def _append_event(self, room: str, event: dict):
|
||||
self._room_events[room].append(event)
|
||||
if len(self._room_events[room]) > self._max_events_per_room:
|
||||
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
|
||||
|
||||
|
||||
# ═══ PluginRegistry (re-implementation for isolated testing) ═══
|
||||
|
||||
class Plugin:
|
||||
name: str = "unnamed"
|
||||
description: str = ""
|
||||
|
||||
def on_message(self, user_id, message, room):
|
||||
return None
|
||||
|
||||
def on_join(self, user_id, room):
|
||||
return None
|
||||
|
||||
def on_leave(self, user_id, room):
|
||||
return None
|
||||
|
||||
def on_command(self, user_id, command, args, room):
|
||||
return None
|
||||
|
||||
|
||||
class PluginRegistry:
|
||||
def __init__(self):
|
||||
self._plugins: dict[str, Plugin] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def register(self, plugin: Plugin):
|
||||
with self._lock:
|
||||
self._plugins[plugin.name] = plugin
|
||||
|
||||
def unregister(self, name: str) -> bool:
|
||||
with self._lock:
|
||||
if name in self._plugins:
|
||||
del self._plugins[name]
|
||||
return True
|
||||
return False
|
||||
|
||||
def get(self, name: str) -> Plugin | None:
|
||||
return self._plugins.get(name)
|
||||
|
||||
def list_plugins(self) -> list[dict]:
|
||||
return [{"name": p.name, "description": p.description} for p in self._plugins.values()]
|
||||
|
||||
def fire_on_message(self, user_id, message, room):
|
||||
for plugin in self._plugins.values():
|
||||
result = plugin.on_message(user_id, message, room)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
def fire_on_join(self, user_id, room):
|
||||
messages = []
|
||||
for plugin in self._plugins.values():
|
||||
result = plugin.on_join(user_id, room)
|
||||
if result is not None:
|
||||
messages.append(result)
|
||||
return "\n".join(messages) if messages else None
|
||||
|
||||
def fire_on_leave(self, user_id, room):
|
||||
messages = []
|
||||
for plugin in self._plugins.values():
|
||||
result = plugin.on_leave(user_id, room)
|
||||
if result is not None:
|
||||
messages.append(result)
|
||||
return "\n".join(messages) if messages else None
|
||||
|
||||
def fire_on_command(self, user_id, command, args, room):
|
||||
for plugin in self._plugins.values():
|
||||
result = plugin.on_command(user_id, command, args, room)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
# ═══ Tests ═══════════════════════════════════════════════════════
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class TestChatLog(unittest.TestCase):
|
||||
|
||||
def test_log_and_retrieve(self):
|
||||
log = ChatLog()
|
||||
entry = log.log("room1", "say", "hello", user_id="u1", username="Alice")
|
||||
self.assertEqual(entry["message"], "hello")
|
||||
self.assertEqual(entry["room"], "room1")
|
||||
history = log.get_history("room1")
|
||||
self.assertEqual(len(history), 1)
|
||||
self.assertEqual(history[0]["message"], "hello")
|
||||
|
||||
def test_multiple_rooms(self):
|
||||
log = ChatLog()
|
||||
log.log("room1", "say", "hello")
|
||||
log.log("room2", "ask", "what?")
|
||||
self.assertEqual(set(log.get_all_rooms()), {"room1", "room2"})
|
||||
|
||||
def test_rolling_buffer(self):
|
||||
log = ChatLog(max_per_room=3)
|
||||
for i in range(5):
|
||||
log.log("room1", "say", f"msg{i}")
|
||||
history = log.get_history("room1")
|
||||
self.assertEqual(len(history), 3)
|
||||
self.assertEqual(history[0]["message"], "msg2")
|
||||
self.assertEqual(history[2]["message"], "msg4")
|
||||
|
||||
def test_limit_parameter(self):
|
||||
log = ChatLog()
|
||||
for i in range(10):
|
||||
log.log("room1", "say", f"msg{i}")
|
||||
history = log.get_history("room1", limit=3)
|
||||
self.assertEqual(len(history), 3)
|
||||
|
||||
def test_since_filter(self):
|
||||
log = ChatLog()
|
||||
log.log("room1", "say", "old")
|
||||
time.sleep(0.01)
|
||||
cutoff = datetime.now().isoformat()
|
||||
time.sleep(0.01)
|
||||
log.log("room1", "say", "new")
|
||||
history = log.get_history("room1", since=cutoff)
|
||||
self.assertEqual(len(history), 1)
|
||||
self.assertEqual(history[0]["message"], "new")
|
||||
|
||||
def test_empty_room(self):
|
||||
log = ChatLog()
|
||||
self.assertEqual(log.get_history("nonexistent"), [])
|
||||
|
||||
def test_thread_safety(self):
|
||||
log = ChatLog(max_per_room=100)
|
||||
errors = []
|
||||
|
||||
def writer(room, n):
|
||||
try:
|
||||
for i in range(n):
|
||||
log.log(room, "say", f"{room}-{i}")
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [threading.Thread(target=writer, args=(f"room{t}", 50)) for t in range(4)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
self.assertEqual(len(errors), 0)
|
||||
total = sum(len(log.get_history(f"room{t}")) for t in range(4))
|
||||
self.assertEqual(total, 200)
|
||||
|
||||
|
||||
class TestPresenceManager(unittest.TestCase):
|
||||
|
||||
def test_enter_room(self):
|
||||
pm = PresenceManager()
|
||||
event = pm.enter_room("u1", "Alice", "lobby")
|
||||
self.assertEqual(event["event"], "enter")
|
||||
self.assertEqual(event["username"], "Alice")
|
||||
players = pm.get_players_in_room("lobby")
|
||||
self.assertEqual(len(players), 1)
|
||||
|
||||
def test_leave_room(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
event = pm.leave_room("u1", "lobby")
|
||||
self.assertEqual(event["event"], "leave")
|
||||
self.assertEqual(len(pm.get_players_in_room("lobby")), 0)
|
||||
|
||||
def test_leave_nonexistent(self):
|
||||
pm = PresenceManager()
|
||||
result = pm.leave_room("u1", "lobby")
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_multiple_users(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
pm.enter_room("u2", "Bob", "lobby")
|
||||
players = pm.get_players_in_room("lobby")
|
||||
self.assertEqual(len(players), 2)
|
||||
|
||||
def test_say_event(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
event = pm.say("u1", "Alice", "lobby", "hello world")
|
||||
self.assertEqual(event["type"], "say")
|
||||
self.assertEqual(event["message"], "hello world")
|
||||
events = pm.get_room_events("lobby")
|
||||
self.assertEqual(len(events), 2) # enter + say
|
||||
|
||||
def test_cleanup_user(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
pm.enter_room("u1", "Alice", "tavern")
|
||||
events = pm.cleanup_user("u1")
|
||||
self.assertEqual(len(events), 2) # left both rooms
|
||||
self.assertEqual(len(pm.get_players_in_room("lobby")), 0)
|
||||
self.assertEqual(len(pm.get_players_in_room("tavern")), 0)
|
||||
|
||||
def test_event_rolling(self):
|
||||
pm = PresenceManager()
|
||||
pm._max_events_per_room = 3
|
||||
for i in range(5):
|
||||
pm.say("u1", "Alice", "lobby", f"msg{i}")
|
||||
events = pm.get_room_events("lobby")
|
||||
self.assertEqual(len(events), 3)
|
||||
|
||||
def test_room_isolation(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
pm.enter_room("u2", "Bob", "tavern")
|
||||
self.assertEqual(len(pm.get_players_in_room("lobby")), 1)
|
||||
self.assertEqual(len(pm.get_players_in_room("tavern")), 1)
|
||||
|
||||
|
||||
class TestPluginRegistry(unittest.TestCase):
|
||||
|
||||
def test_register_and_get(self):
|
||||
reg = PluginRegistry()
|
||||
p = Plugin()
|
||||
p.name = "test"
|
||||
p.description = "A test plugin"
|
||||
reg.register(p)
|
||||
self.assertEqual(reg.get("test"), p)
|
||||
|
||||
def test_unregister(self):
|
||||
reg = PluginRegistry()
|
||||
p = Plugin()
|
||||
p.name = "test"
|
||||
reg.register(p)
|
||||
self.assertTrue(reg.unregister("test"))
|
||||
self.assertIsNone(reg.get("test"))
|
||||
|
||||
def test_unregister_missing(self):
|
||||
reg = PluginRegistry()
|
||||
self.assertFalse(reg.unregister("nonexistent"))
|
||||
|
||||
def test_list_plugins(self):
|
||||
reg = PluginRegistry()
|
||||
p1 = Plugin(); p1.name = "a"; p1.description = "A"
|
||||
p2 = Plugin(); p2.name = "b"; p2.description = "B"
|
||||
reg.register(p1)
|
||||
reg.register(p2)
|
||||
names = [p["name"] for p in reg.list_plugins()]
|
||||
self.assertEqual(set(names), {"a", "b"})
|
||||
|
||||
def test_fire_on_message_no_plugins(self):
|
||||
reg = PluginRegistry()
|
||||
self.assertIsNone(reg.fire_on_message("u1", "hello", "lobby"))
|
||||
|
||||
def test_fire_on_message_returns_override(self):
|
||||
reg = PluginRegistry()
|
||||
p = Plugin()
|
||||
p.name = "greeter"
|
||||
p.on_message = lambda uid, msg, room: "Welcome!"
|
||||
reg.register(p)
|
||||
result = reg.fire_on_message("u1", "hello", "lobby")
|
||||
self.assertEqual(result, "Welcome!")
|
||||
|
||||
def test_fire_on_join_collects(self):
|
||||
reg = PluginRegistry()
|
||||
p1 = Plugin(); p1.name = "a"
|
||||
p1.on_join = lambda uid, room: "Hello from A"
|
||||
p2 = Plugin(); p2.name = "b"
|
||||
p2.on_join = lambda uid, room: "Hello from B"
|
||||
reg.register(p1)
|
||||
reg.register(p2)
|
||||
result = reg.fire_on_join("u1", "lobby")
|
||||
self.assertIn("Hello from A", result)
|
||||
self.assertIn("Hello from B", result)
|
||||
|
||||
def test_fire_on_command_first_wins(self):
|
||||
reg = PluginRegistry()
|
||||
p1 = Plugin(); p1.name = "a"
|
||||
p1.on_command = lambda uid, cmd, args, room: {"result": "from A"}
|
||||
p2 = Plugin(); p2.name = "b"
|
||||
p2.on_command = lambda uid, cmd, args, room: {"result": "from B"}
|
||||
reg.register(p1)
|
||||
reg.register(p2)
|
||||
result = reg.fire_on_command("u1", "look", "", "lobby")
|
||||
self.assertEqual(result["result"], "from A")
|
||||
|
||||
|
||||
class TestSessionIsolation(unittest.TestCase):
|
||||
"""Test that session data doesn't leak between users."""
|
||||
|
||||
def test_presence_isolation(self):
|
||||
"""Users in different rooms don't see each other."""
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "room-a")
|
||||
pm.enter_room("u2", "Bob", "room-b")
|
||||
self.assertEqual(len(pm.get_players_in_room("room-a")), 1)
|
||||
self.assertEqual(len(pm.get_players_in_room("room-b")), 1)
|
||||
self.assertEqual(pm.get_players_in_room("room-a")[0]["username"], "Alice")
|
||||
self.assertEqual(pm.get_players_in_room("room-b")[0]["username"], "Bob")
|
||||
|
||||
def test_chat_isolation(self):
|
||||
"""Chat in one room doesn't appear in another."""
|
||||
log = ChatLog()
|
||||
log.log("room-a", "say", "secret", user_id="u1")
|
||||
log.log("room-b", "say", "public", user_id="u2")
|
||||
self.assertEqual(len(log.get_history("room-a")), 1)
|
||||
self.assertEqual(len(log.get_history("room-b")), 1)
|
||||
self.assertEqual(log.get_history("room-a")[0]["message"], "secret")
|
||||
self.assertEqual(log.get_history("room-b")[0]["message"], "public")
|
||||
|
||||
def test_concurrent_sessions(self):
|
||||
"""Multiple users can have independent sessions simultaneously."""
|
||||
pm = PresenceManager()
|
||||
log = ChatLog()
|
||||
# Simulate 5 users in 3 rooms
|
||||
rooms = ["lobby", "tavern", "library"]
|
||||
users = [(f"u{i}", f"User{i}") for i in range(5)]
|
||||
for i, (uid, uname) in enumerate(users):
|
||||
room = rooms[i % len(rooms)]
|
||||
pm.enter_room(uid, uname, room)
|
||||
log.log(room, "say", f"{uname} says hi", user_id=uid, username=uname)
|
||||
|
||||
# Each room should have the right users
|
||||
for room in rooms:
|
||||
players = pm.get_players_in_room(room)
|
||||
self.assertGreater(len(players), 0)
|
||||
history = log.get_history(room)
|
||||
self.assertEqual(len(history), len(players))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user