Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b25ded9f6 |
@@ -395,6 +395,7 @@
|
||||
<div id="memory-connections-panel" class="memory-connections-panel" style="display:none;" aria-label="Memory Connections Panel"></div>
|
||||
|
||||
<script src="./boot.js"></script>
|
||||
<script src="./js/heartbeat.js"></script>
|
||||
<script>
|
||||
function openMemoryFilter() { renderFilterList(); document.getElementById('memory-filter').style.display = 'flex'; }
|
||||
function closeMemoryFilter() { document.getElementById('memory-filter').style.display = 'none'; }
|
||||
|
||||
149
js/heartbeat-patch.js
Normal file
149
js/heartbeat-patch.js
Normal file
@@ -0,0 +1,149 @@
|
||||
/**
|
||||
* Patch for app.js to add WebSocket heartbeat functionality
|
||||
* Issue #1535: feat: WebSocket heartbeat with auto-reconnect from client
|
||||
*/
|
||||
|
||||
// Add this to the top of app.js or in a separate module import
|
||||
// import NexusHeartbeat from './heartbeat.js';
|
||||
|
||||
// Modified connectHermes function with heartbeat support
|
||||
function connectHermes() {
|
||||
// Initialize MemPalace before Hermes connection
|
||||
initializeMemPalace();
|
||||
|
||||
if (hermesWs) return;
|
||||
|
||||
// Initialize MemPalace storage
|
||||
try {
|
||||
console.log('Initializing MemPalace memory system...');
|
||||
const statusEl = document.getElementById('mem-palace-status');
|
||||
if (statusEl) {
|
||||
statusEl.textContent = 'MEMPALACE INITIALIZING';
|
||||
statusEl.style.color = '#4af0c0';
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to initialize MemPalace:', err);
|
||||
const statusEl = document.getElementById('mem-palace-status');
|
||||
if (statusEl) {
|
||||
statusEl.textContent = 'MEMPALACE ERROR';
|
||||
statusEl.style.color = '#ff4466';
|
||||
}
|
||||
}
|
||||
|
||||
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||
const wsUrl = `${protocol}//${window.location.host}/api/world/ws`;
|
||||
|
||||
console.log(`Connecting to Hermes at ${wsUrl}...`);
|
||||
|
||||
// Create heartbeat client
|
||||
const heartbeat = new NexusHeartbeat({
|
||||
heartbeatInterval: 30000, // 30 seconds
|
||||
maxMissedHeartbeats: 2,
|
||||
onConnect: () => {
|
||||
console.log('Hermes connected with heartbeat.');
|
||||
wsConnected = true;
|
||||
addChatMessage('system', 'Hermes link established.');
|
||||
updateWsHudStatus(true);
|
||||
refreshWorkshopPanel();
|
||||
},
|
||||
onDisconnect: (event) => {
|
||||
console.warn('Hermes disconnected:', event.code, event.reason);
|
||||
wsConnected = false;
|
||||
updateWsHudStatus(false);
|
||||
refreshWorkshopPanel();
|
||||
},
|
||||
onHeartbeat: (data) => {
|
||||
// Update HUD with user count
|
||||
if (data.user_count !== undefined) {
|
||||
const userCountEl = document.getElementById('user-count');
|
||||
if (userCountEl) {
|
||||
userCountEl.textContent = data.user_count;
|
||||
}
|
||||
}
|
||||
},
|
||||
onUserCount: (count) => {
|
||||
console.log(`Connected users: ${count}`);
|
||||
// Update any UI elements showing user count
|
||||
},
|
||||
onError: (error) => {
|
||||
console.error('Hermes WS error:', error);
|
||||
}
|
||||
});
|
||||
|
||||
// Store heartbeat client globally
|
||||
window.nexusHeartbeat = heartbeat;
|
||||
|
||||
// Connect with heartbeat
|
||||
heartbeat.connect(wsUrl);
|
||||
|
||||
// For backward compatibility, create a wrapper that looks like the old WebSocket
|
||||
hermesWs = {
|
||||
send: (data) => {
|
||||
if (heartbeat.isConnectionActive()) {
|
||||
heartbeat.ws.send(data);
|
||||
}
|
||||
},
|
||||
close: () => {
|
||||
heartbeat.disconnect();
|
||||
},
|
||||
get readyState() {
|
||||
return heartbeat.isConnectionActive() ? WebSocket.OPEN : WebSocket.CLOSED;
|
||||
}
|
||||
};
|
||||
|
||||
// Initialize MemPalace
|
||||
connectMemPalace();
|
||||
|
||||
// Mnemosyne: request memory sync from Hermes when connected
|
||||
const waitForConnection = setInterval(() => {
|
||||
if (heartbeat.isConnectionActive()) {
|
||||
clearInterval(waitForConnection);
|
||||
try {
|
||||
heartbeat.ws.send(JSON.stringify({ type: 'memory', action: 'sync_request' }));
|
||||
console.info('[Mnemosyne] Sent sync_request to Hermes');
|
||||
} catch (e) {
|
||||
console.warn('[Mnemosyne] Failed to send sync_request:', e);
|
||||
}
|
||||
}
|
||||
}, 100);
|
||||
}
|
||||
|
||||
// Update position when user moves
|
||||
function updateUserPosition(x, y, z) {
|
||||
if (window.nexusHeartbeat) {
|
||||
window.nexusHeartbeat.updatePosition(x, y, z);
|
||||
}
|
||||
}
|
||||
|
||||
// Get heartbeat stats
|
||||
function getHeartbeatStats() {
|
||||
if (window.nexusHeartbeat) {
|
||||
return window.nexusHeartbeat.getStats();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Add heartbeat status to HUD
|
||||
function updateHeartbeatStatus() {
|
||||
const stats = getHeartbeatStats();
|
||||
if (!stats) return;
|
||||
|
||||
const statusEl = document.getElementById('heartbeat-status');
|
||||
if (statusEl) {
|
||||
statusEl.textContent = stats.connected ? 'HEARTBEAT: ACTIVE' : 'HEARTBEAT: INACTIVE';
|
||||
statusEl.style.color = stats.connected ? '#4af0c0' : '#ff4466';
|
||||
}
|
||||
|
||||
const missedEl = document.getElementById('missed-heartbeats');
|
||||
if (missedEl) {
|
||||
missedEl.textContent = `Missed: ${stats.missedHeartbeats}`;
|
||||
}
|
||||
}
|
||||
|
||||
// Call this in the game loop or on position updates
|
||||
setInterval(updateHeartbeatStatus, 1000);
|
||||
|
||||
// Add to HTML:
|
||||
// <div id="heartbeat-status" style="position: fixed; bottom: 10px; right: 10px; color: #4af0c0;"></div>
|
||||
// <div id="missed-heartbeats" style="position: fixed; bottom: 30px; right: 10px; color: #888;"></div>
|
||||
// <div id="user-count" style="position: fixed; bottom: 50px; right: 10px; color: #4af0c0;"></div>
|
||||
293
js/heartbeat.js
Normal file
293
js/heartbeat.js
Normal file
@@ -0,0 +1,293 @@
|
||||
/**
|
||||
* WebSocket Heartbeat Client for The Nexus
|
||||
* Issue #1535: feat: WebSocket heartbeat with auto-reconnect from client
|
||||
*
|
||||
* Provides:
|
||||
* - Client sends heartbeat ping every 30s
|
||||
* - Server responds with pong + user count
|
||||
* - Client auto-reconnects on missed 2 heartbeats
|
||||
* - Reconnect preserves user position/identity
|
||||
*/
|
||||
|
||||
class NexusHeartbeat {
|
||||
constructor(options = {}) {
|
||||
this.heartbeatInterval = options.heartbeatInterval || 30000; // 30 seconds
|
||||
this.maxMissedHeartbeats = options.maxMissedHeartbeats || 2;
|
||||
this.reconnectDelay = options.reconnectDelay || 1000; // 1 second
|
||||
this.maxReconnectDelay = options.maxReconnectDelay || 30000; // 30 seconds
|
||||
|
||||
this.ws = null;
|
||||
this.heartbeatTimer = null;
|
||||
this.missedHeartbeats = 0;
|
||||
this.isConnected = false;
|
||||
this.userId = options.userId || this.generateUserId();
|
||||
this.position = options.position || { x: 0, y: 0, z: 0 };
|
||||
this.reconnectAttempts = 0;
|
||||
|
||||
// Callbacks
|
||||
this.onConnect = options.onConnect || (() => {});
|
||||
this.onDisconnect = options.onDisconnect || (() => {});
|
||||
this.onHeartbeat = options.onHeartbeat || (() => {});
|
||||
this.onUserCount = options.onUserCount || (() => {});
|
||||
this.onError = options.onError || console.error;
|
||||
|
||||
// Bind methods
|
||||
this.connect = this.connect.bind(this);
|
||||
this.disconnect = this.disconnect.bind(this);
|
||||
this.sendHeartbeat = this.sendHeartbeat.bind(this);
|
||||
this.handleMessage = this.handleMessage.bind(this);
|
||||
this.handleClose = this.handleClose.bind(this);
|
||||
this.handleError = this.handleError.bind(this);
|
||||
}
|
||||
|
||||
generateUserId() {
|
||||
return 'user_' + Math.random().toString(36).substr(2, 9);
|
||||
}
|
||||
|
||||
connect(url) {
|
||||
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||||
console.warn('Already connected');
|
||||
return;
|
||||
}
|
||||
|
||||
this.url = url;
|
||||
console.log(`Connecting to ${url}...`);
|
||||
|
||||
try {
|
||||
this.ws = new WebSocket(url);
|
||||
this.ws.onopen = this.handleOpen.bind(this);
|
||||
this.ws.onmessage = this.handleMessage;
|
||||
this.ws.onclose = this.handleClose;
|
||||
this.ws.onerror = this.handleError;
|
||||
} catch (error) {
|
||||
this.onError('Failed to create WebSocket:', error);
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
console.log('Disconnecting...');
|
||||
|
||||
// Stop heartbeat
|
||||
this.stopHeartbeat();
|
||||
|
||||
// Close WebSocket
|
||||
if (this.ws) {
|
||||
this.ws.onclose = null; // Prevent reconnect on manual disconnect
|
||||
this.ws.close(1000, 'Manual disconnect');
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
this.isConnected = false;
|
||||
this.missedHeartbeats = 0;
|
||||
this.reconnectAttempts = 0;
|
||||
}
|
||||
|
||||
handleOpen() {
|
||||
console.log('Connected to WebSocket');
|
||||
this.isConnected = true;
|
||||
this.missedHeartbeats = 0;
|
||||
this.reconnectAttempts = 0;
|
||||
|
||||
// Send reconnect message with user info
|
||||
this.sendReconnect();
|
||||
|
||||
// Start heartbeat
|
||||
this.startHeartbeat();
|
||||
|
||||
// Call connect callback
|
||||
this.onConnect();
|
||||
}
|
||||
|
||||
handleMessage(event) {
|
||||
try {
|
||||
const data = JSON.parse(event.data);
|
||||
|
||||
if (data.type === 'pong') {
|
||||
// Reset missed heartbeats
|
||||
this.missedHeartbeats = 0;
|
||||
|
||||
// Update user count
|
||||
if (data.user_count !== undefined) {
|
||||
this.onUserCount(data.user_count);
|
||||
}
|
||||
|
||||
// Call heartbeat callback
|
||||
this.onHeartbeat(data);
|
||||
|
||||
console.debug('Heartbeat pong received');
|
||||
} else if (data.type === 'health') {
|
||||
// Health check response
|
||||
console.debug('Health check:', data);
|
||||
} else {
|
||||
// Regular message
|
||||
console.debug('Message received:', data);
|
||||
}
|
||||
} catch (error) {
|
||||
// Not JSON or parse error
|
||||
console.debug('Non-JSON message received:', event.data);
|
||||
}
|
||||
}
|
||||
|
||||
handleClose(event) {
|
||||
console.log(`WebSocket closed: ${event.code} ${event.reason}`);
|
||||
this.isConnected = false;
|
||||
this.stopHeartbeat();
|
||||
|
||||
// Call disconnect callback
|
||||
this.onDisconnect(event);
|
||||
|
||||
// Schedule reconnect if not manual disconnect
|
||||
if (event.code !== 1000) {
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
}
|
||||
|
||||
handleError(error) {
|
||||
this.onError('WebSocket error:', error);
|
||||
}
|
||||
|
||||
startHeartbeat() {
|
||||
if (this.heartbeatTimer) {
|
||||
clearInterval(this.heartbeatTimer);
|
||||
}
|
||||
|
||||
console.log(`Starting heartbeat every ${this.heartbeatInterval / 1000}s`);
|
||||
|
||||
this.heartbeatTimer = setInterval(() => {
|
||||
this.sendHeartbeat();
|
||||
}, this.heartbeatInterval);
|
||||
|
||||
// Send initial heartbeat
|
||||
this.sendHeartbeat();
|
||||
}
|
||||
|
||||
stopHeartbeat() {
|
||||
if (this.heartbeatTimer) {
|
||||
clearInterval(this.heartbeatTimer);
|
||||
this.heartbeatTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
sendHeartbeat() {
|
||||
if (!this.isConnected || !this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
console.warn('Cannot send heartbeat: not connected');
|
||||
return;
|
||||
}
|
||||
|
||||
const heartbeat = {
|
||||
type: 'heartbeat',
|
||||
timestamp: Date.now(),
|
||||
user_id: this.userId,
|
||||
position: this.position
|
||||
};
|
||||
|
||||
try {
|
||||
this.ws.send(JSON.stringify(heartbeat));
|
||||
console.debug('Heartbeat sent');
|
||||
|
||||
// Check for missed heartbeats
|
||||
this.missedHeartbeats++;
|
||||
if (this.missedHeartbeats > this.maxMissedHeartbeats) {
|
||||
console.warn(`Missed ${this.missedHeartbeats} heartbeats, reconnecting...`);
|
||||
this.ws.close(4000, 'Missed heartbeats');
|
||||
}
|
||||
} catch (error) {
|
||||
this.onError('Failed to send heartbeat:', error);
|
||||
this.ws.close(4001, 'Heartbeat send failed');
|
||||
}
|
||||
}
|
||||
|
||||
sendReconnect() {
|
||||
if (!this.isConnected || !this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
console.warn('Cannot send reconnect: not connected');
|
||||
return;
|
||||
}
|
||||
|
||||
const reconnect = {
|
||||
type: 'reconnect',
|
||||
timestamp: Date.now(),
|
||||
user_id: this.userId,
|
||||
position: this.position
|
||||
};
|
||||
|
||||
try {
|
||||
this.ws.send(JSON.stringify(reconnect));
|
||||
console.log('Reconnect message sent');
|
||||
} catch (error) {
|
||||
this.onError('Failed to send reconnect:', error);
|
||||
}
|
||||
}
|
||||
|
||||
scheduleReconnect() {
|
||||
if (this.reconnectAttempts >= 10) {
|
||||
console.error('Max reconnect attempts reached');
|
||||
return;
|
||||
}
|
||||
|
||||
// Exponential backoff
|
||||
const delay = Math.min(
|
||||
this.reconnectDelay * Math.pow(2, this.reconnectAttempts),
|
||||
this.maxReconnectDelay
|
||||
);
|
||||
|
||||
console.log(`Reconnecting in ${delay / 1000}s (attempt ${this.reconnectAttempts + 1})...`);
|
||||
|
||||
setTimeout(() => {
|
||||
this.reconnectAttempts++;
|
||||
this.connect(this.url);
|
||||
}, delay);
|
||||
}
|
||||
|
||||
updatePosition(x, y, z) {
|
||||
this.position = { x, y, z };
|
||||
|
||||
// Send position update if connected
|
||||
if (this.isConnected && this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||||
const update = {
|
||||
type: 'position',
|
||||
timestamp: Date.now(),
|
||||
user_id: this.userId,
|
||||
position: this.position
|
||||
};
|
||||
|
||||
try {
|
||||
this.ws.send(JSON.stringify(update));
|
||||
} catch (error) {
|
||||
console.warn('Failed to send position update:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
getUserId() {
|
||||
return this.userId;
|
||||
}
|
||||
|
||||
getPosition() {
|
||||
return { ...this.position };
|
||||
}
|
||||
|
||||
isConnectionActive() {
|
||||
return this.isConnected && this.ws && this.ws.readyState === WebSocket.OPEN;
|
||||
}
|
||||
|
||||
getStats() {
|
||||
return {
|
||||
connected: this.isConnected,
|
||||
userId: this.userId,
|
||||
position: this.position,
|
||||
missedHeartbeats: this.missedHeartbeats,
|
||||
reconnectAttempts: this.reconnectAttempts
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Export for use in other modules
|
||||
if (typeof module !== 'undefined' && module.exports) {
|
||||
module.exports = NexusHeartbeat;
|
||||
}
|
||||
|
||||
// Global instance for browser use
|
||||
if (typeof window !== 'undefined') {
|
||||
window.NexusHeartbeat = NexusHeartbeat;
|
||||
}
|
||||
@@ -29,7 +29,7 @@ from typing import Any, Callable, Optional
|
||||
|
||||
import websockets
|
||||
|
||||
from nexus.bannerlord_trace import BannerlordTraceLogger
|
||||
from bannerlord_trace import BannerlordTraceLogger
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# CONFIGURATION
|
||||
|
||||
@@ -181,63 +181,6 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
|
||||
|
||||
def clean_lines(text: str) -> list[str]:
|
||||
"""Strip ANSI, normalize line endings, return non-empty lines."""
|
||||
text = strip_ansi(text).replace("\r", "")
|
||||
return [line.strip() for line in text.split("\n") if line.strip()]
|
||||
|
||||
|
||||
def parse_room_output(text: str) -> dict:
|
||||
"""Parse Evennia room text into structured data (title, desc, exits, objects)."""
|
||||
lines = clean_lines(text)
|
||||
if len(lines) < 2:
|
||||
return {"title": lines[0] if lines else "", "desc": "", "exits": [], "objects": []}
|
||||
title = lines[0]
|
||||
desc = lines[1]
|
||||
exits = []
|
||||
objects = []
|
||||
for line in lines[2:]:
|
||||
if line.startswith("Exits:"):
|
||||
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
|
||||
exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()]
|
||||
elif line.startswith("You see:"):
|
||||
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
|
||||
parts = [t.strip() for t in raw.split(",") if t.strip()]
|
||||
objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts]
|
||||
return {"title": title, "desc": desc, "exits": exits, "objects": objects}
|
||||
|
||||
|
||||
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]:
|
||||
"""Convert raw Evennia event dict into normalized Nexus events."""
|
||||
from nexus.evennia_event_adapter import (
|
||||
actor_located, command_issued, command_result,
|
||||
room_snapshot, session_bound,
|
||||
)
|
||||
out = []
|
||||
event = raw.get("event")
|
||||
actor = raw.get("actor", "Timmy")
|
||||
timestamp = raw.get("timestamp")
|
||||
if event == "connect":
|
||||
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp))
|
||||
parsed = parse_room_output(raw.get("output", ""))
|
||||
if parsed:
|
||||
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
|
||||
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
|
||||
elif event == "command":
|
||||
cmd = raw.get("command", "")
|
||||
output = raw.get("output", "")
|
||||
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp))
|
||||
success = not output.startswith("Command '") and not output.startswith("Could not find")
|
||||
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp))
|
||||
parsed = parse_room_output(output)
|
||||
if parsed:
|
||||
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
|
||||
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
|
||||
return out
|
||||
|
||||
|
||||
async def playback(log_path: Path, ws_url: str):
|
||||
"""Legacy mode: replay a telemetry JSONL file."""
|
||||
from nexus.evennia_event_adapter import (
|
||||
|
||||
227
nexus/heartbeat_manager.py
Normal file
227
nexus/heartbeat_manager.py
Normal file
@@ -0,0 +1,227 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
WebSocket Heartbeat Manager for The Nexus
|
||||
Issue #1535: feat: WebSocket heartbeat with auto-reconnect from client
|
||||
|
||||
Provides:
|
||||
- Client sends heartbeat ping every 30s
|
||||
- Server responds with pong + user count
|
||||
- Client auto-reconnects on missed 2 heartbeats
|
||||
- Reconnect preserves user position/identity
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
from typing import Dict, Set, Optional, Any
|
||||
from dataclasses import dataclass, asdict
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger("nexus-heartbeat")
|
||||
|
||||
class HeartbeatType(Enum):
|
||||
PING = "ping"
|
||||
PONG = "pong"
|
||||
RECONNECT = "reconnect"
|
||||
|
||||
@dataclass
|
||||
class HeartbeatMessage:
|
||||
type: HeartbeatType
|
||||
timestamp: float
|
||||
user_count: Optional[int] = None
|
||||
user_id: Optional[str] = None
|
||||
position: Optional[Dict[str, float]] = None
|
||||
|
||||
def to_json(self) -> str:
|
||||
data = asdict(self)
|
||||
# Convert enum to string value
|
||||
data['type'] = self.type.value
|
||||
return json.dumps(data, default=str)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_str: str) -> 'HeartbeatMessage':
|
||||
data = json.loads(json_str)
|
||||
data['type'] = HeartbeatType(data['type'])
|
||||
return cls(**data)
|
||||
|
||||
class HeartbeatManager:
|
||||
def __init__(self, heartbeat_interval: float = 30.0, max_missed: int = 2):
|
||||
self.heartbeat_interval = heartbeat_interval
|
||||
self.max_missed = max_missed
|
||||
self.clients: Dict[Any, Dict[str, Any]] = {} # websocket -> client info
|
||||
self.heartbeat_tasks: Dict[Any, asyncio.Task] = {}
|
||||
self.last_heartbeat: Dict[Any, float] = {}
|
||||
self.missed_heartbeats: Dict[Any, int] = {}
|
||||
|
||||
async def register_client(self, websocket, user_id: str = None, position: Dict[str, float] = None):
|
||||
"""Register a new client for heartbeat monitoring."""
|
||||
self.clients[websocket] = {
|
||||
'user_id': user_id or f"user_{id(websocket)}",
|
||||
'position': position or {'x': 0, 'y': 0, 'z': 0},
|
||||
'connected_at': time.time(),
|
||||
'last_activity': time.time()
|
||||
}
|
||||
self.last_heartbeat[websocket] = time.time()
|
||||
self.missed_heartbeats[websocket] = 0
|
||||
|
||||
# Start heartbeat task for this client
|
||||
task = asyncio.create_task(self._heartbeat_loop(websocket))
|
||||
self.heartbeat_tasks[websocket] = task
|
||||
|
||||
logger.info(f"Client registered for heartbeat: {self.clients[websocket]['user_id']}")
|
||||
|
||||
# Send initial pong with user count
|
||||
await self._send_pong(websocket)
|
||||
|
||||
async def unregister_client(self, websocket):
|
||||
"""Unregister a client from heartbeat monitoring."""
|
||||
if websocket in self.clients:
|
||||
user_id = self.clients[websocket]['user_id']
|
||||
logger.info(f"Client unregistered from heartbeat: {user_id}")
|
||||
|
||||
# Cancel heartbeat task
|
||||
if websocket in self.heartbeat_tasks:
|
||||
self.heartbeat_tasks[websocket].cancel()
|
||||
try:
|
||||
await self.heartbeat_tasks[websocket]
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
del self.heartbeat_tasks[websocket]
|
||||
|
||||
# Clean up
|
||||
del self.clients[websocket]
|
||||
self.last_heartbeat.pop(websocket, None)
|
||||
self.missed_heartbeats.pop(websocket, None)
|
||||
|
||||
async def handle_message(self, websocket, message: str):
|
||||
"""Handle incoming WebSocket message."""
|
||||
try:
|
||||
data = json.loads(message)
|
||||
msg_type = data.get('type')
|
||||
|
||||
if msg_type == 'heartbeat':
|
||||
await self._handle_heartbeat(websocket, data)
|
||||
elif msg_type == 'reconnect':
|
||||
await self._handle_reconnect(websocket, data)
|
||||
else:
|
||||
# Update last activity for non-heartbeat messages
|
||||
if websocket in self.clients:
|
||||
self.clients[websocket]['last_activity'] = time.time()
|
||||
|
||||
except json.JSONDecodeError:
|
||||
# Not a JSON message, ignore
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling message: {e}")
|
||||
|
||||
async def _handle_heartbeat(self, websocket, data: Dict[str, Any]):
|
||||
"""Handle heartbeat ping from client."""
|
||||
if websocket not in self.clients:
|
||||
return
|
||||
|
||||
# Update client info
|
||||
self.clients[websocket]['last_activity'] = time.time()
|
||||
self.last_heartbeat[websocket] = time.time()
|
||||
self.missed_heartbeats[websocket] = 0
|
||||
|
||||
# Update position if provided
|
||||
if 'position' in data:
|
||||
self.clients[websocket]['position'] = data['position']
|
||||
|
||||
# Send pong response
|
||||
await self._send_pong(websocket)
|
||||
|
||||
logger.debug(f"Heartbeat received from {self.clients[websocket]['user_id']}")
|
||||
|
||||
async def _handle_reconnect(self, websocket, data: Dict[str, Any]):
|
||||
"""Handle reconnect request from client."""
|
||||
user_id = data.get('user_id')
|
||||
position = data.get('position')
|
||||
|
||||
if websocket in self.clients:
|
||||
# Update existing client
|
||||
if user_id:
|
||||
self.clients[websocket]['user_id'] = user_id
|
||||
if position:
|
||||
self.clients[websocket]['position'] = position
|
||||
|
||||
logger.info(f"Client reconnected: {self.clients[websocket]['user_id']}")
|
||||
|
||||
# Reset heartbeat state
|
||||
self.last_heartbeat[websocket] = time.time()
|
||||
self.missed_heartbeats[websocket] = 0
|
||||
|
||||
# Send pong with current user count
|
||||
await self._send_pong(websocket)
|
||||
|
||||
async def _send_pong(self, websocket):
|
||||
"""Send pong response with user count."""
|
||||
try:
|
||||
user_count = len(self.clients)
|
||||
pong = HeartbeatMessage(
|
||||
type=HeartbeatType.PONG,
|
||||
timestamp=time.time(),
|
||||
user_count=user_count
|
||||
)
|
||||
|
||||
await websocket.send(pong.to_json())
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send pong: {e}")
|
||||
|
||||
async def _heartbeat_loop(self, websocket):
|
||||
"""Heartbeat loop for a specific client."""
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(self.heartbeat_interval)
|
||||
|
||||
if websocket not in self.clients:
|
||||
break
|
||||
|
||||
# Check if we've missed heartbeats
|
||||
last_beat = self.last_heartbeat.get(websocket, 0)
|
||||
time_since_last = time.time() - last_beat
|
||||
|
||||
if time_since_last > self.heartbeat_interval * 1.5:
|
||||
# Missed heartbeat
|
||||
self.missed_heartbeats[websocket] = self.missed_heartbeats.get(websocket, 0) + 1
|
||||
missed = self.missed_heartbeats[websocket]
|
||||
|
||||
logger.warning(f"Missed heartbeat {missed}/{self.max_missed} from {self.clients[websocket]['user_id']}")
|
||||
|
||||
if missed >= self.max_missed:
|
||||
# Too many missed heartbeats, close connection
|
||||
logger.warning(f"Closing connection to {self.clients[websocket]['user_id']} due to missed heartbeats")
|
||||
await websocket.close(1000, "Missed heartbeats")
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
# Task was cancelled, clean up
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Error in heartbeat loop: {e}")
|
||||
|
||||
def get_client_info(self, websocket) -> Optional[Dict[str, Any]]:
|
||||
"""Get information about a client."""
|
||||
return self.clients.get(websocket)
|
||||
|
||||
def get_all_clients(self) -> Dict[Any, Dict[str, Any]]:
|
||||
"""Get information about all clients."""
|
||||
return self.clients.copy()
|
||||
|
||||
def get_user_count(self) -> int:
|
||||
"""Get current user count."""
|
||||
return len(self.clients)
|
||||
|
||||
def update_user_position(self, websocket, position: Dict[str, float]):
|
||||
"""Update user position."""
|
||||
if websocket in self.clients:
|
||||
self.clients[websocket]['position'] = position
|
||||
self.clients[websocket]['last_activity'] = time.time()
|
||||
|
||||
# Global heartbeat manager instance
|
||||
heartbeat_manager = HeartbeatManager()
|
||||
|
||||
# Export for use in server.py
|
||||
__all__ = ['HeartbeatManager', 'HeartbeatMessage', 'HeartbeatType', 'heartbeat_manager']
|
||||
132
server.py
132
server.py
@@ -3,17 +3,25 @@
|
||||
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
|
||||
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
|
||||
the body (Evennia/Morrowind), and the visualization surface.
|
||||
|
||||
Issue #1535: feat: WebSocket heartbeat with auto-reconnect from client
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from typing import Set
|
||||
from typing import Set, Dict, Any
|
||||
from pathlib import Path
|
||||
|
||||
# Branch protected file - see POLICY.md
|
||||
import websockets
|
||||
|
||||
# Import heartbeat manager
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from nexus.heartbeat_manager import heartbeat_manager, HeartbeatMessage, HeartbeatType
|
||||
|
||||
# Configuration
|
||||
PORT = 8765
|
||||
HOST = "0.0.0.0" # Allow external connections if needed
|
||||
@@ -35,50 +43,91 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
|
||||
addr = websocket.remote_address
|
||||
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
|
||||
|
||||
# Register with heartbeat manager
|
||||
await heartbeat_manager.register_client(websocket)
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
# Handle heartbeat messages
|
||||
await heartbeat_manager.handle_message(websocket, message)
|
||||
|
||||
# Parse for logging/validation if it's JSON
|
||||
try:
|
||||
data = json.loads(message)
|
||||
msg_type = data.get("type", "unknown")
|
||||
# Optional: log specific important message types
|
||||
if msg_type in ["agent_register", "thought", "action"]:
|
||||
logger.debug(f"Received {msg_type} from {addr}")
|
||||
|
||||
# Skip logging for heartbeat messages
|
||||
if msg_type not in ["heartbeat", "pong"]:
|
||||
# Log specific important message types
|
||||
if msg_type in ["agent_register", "thought", "action"]:
|
||||
logger.debug(f"Received {msg_type} from {addr}")
|
||||
|
||||
# Broadcast to all OTHER clients (non-heartbeat messages)
|
||||
if not clients:
|
||||
continue
|
||||
|
||||
disconnected = set()
|
||||
# Create broadcast tasks, tracking which client each task targets
|
||||
task_client_pairs = []
|
||||
for client in clients:
|
||||
if client != websocket and client.open:
|
||||
task = asyncio.create_task(client.send(message))
|
||||
task_client_pairs.append((task, client))
|
||||
|
||||
if task_client_pairs:
|
||||
tasks = [pair[0] for pair in task_client_pairs]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for i, result in enumerate(results):
|
||||
if isinstance(result, Exception):
|
||||
target_client = task_client_pairs[i][1]
|
||||
logger.error(f"Failed to send to client {target_client.remote_address}: {result}")
|
||||
disconnected.add(target_client)
|
||||
|
||||
if disconnected:
|
||||
clients.difference_update(disconnected)
|
||||
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
|
||||
# Broadcast to all OTHER clients
|
||||
if not clients:
|
||||
continue
|
||||
|
||||
disconnected = set()
|
||||
# Create broadcast tasks, tracking which client each task targets
|
||||
task_client_pairs = []
|
||||
for client in clients:
|
||||
if client != websocket and client.open:
|
||||
task = asyncio.create_task(client.send(message))
|
||||
task_client_pairs.append((task, client))
|
||||
|
||||
if task_client_pairs:
|
||||
tasks = [pair[0] for pair in task_client_pairs]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for i, result in enumerate(results):
|
||||
if isinstance(result, Exception):
|
||||
target_client = task_client_pairs[i][1]
|
||||
logger.error(f"Failed to send to client {target_client.remote_address}: {result}")
|
||||
disconnected.add(target_client)
|
||||
|
||||
if disconnected:
|
||||
clients.difference_update(disconnected)
|
||||
# Not JSON, broadcast raw message
|
||||
if not clients:
|
||||
continue
|
||||
|
||||
disconnected = set()
|
||||
for client in clients:
|
||||
if client != websocket and client.open:
|
||||
try:
|
||||
await client.send(message)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send to client {client.remote_address}: {e}")
|
||||
disconnected.add(client)
|
||||
|
||||
if disconnected:
|
||||
clients.difference_update(disconnected)
|
||||
|
||||
except websockets.exceptions.ConnectionClosed:
|
||||
logger.debug(f"Connection closed by client {addr}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling client {addr}: {e}")
|
||||
finally:
|
||||
# Unregister from heartbeat manager
|
||||
await heartbeat_manager.unregister_client(websocket)
|
||||
clients.discard(websocket)
|
||||
logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}")
|
||||
|
||||
async def health_check_handler(websocket: websockets.WebSocketServerProtocol):
|
||||
"""Handle health check requests."""
|
||||
try:
|
||||
# Send health status
|
||||
health_status = {
|
||||
"type": "health",
|
||||
"status": "ok",
|
||||
"timestamp": asyncio.get_event_loop().time(),
|
||||
"clients": len(clients),
|
||||
"heartbeat_clients": heartbeat_manager.get_user_count()
|
||||
}
|
||||
await websocket.send(json.dumps(health_status))
|
||||
except Exception as e:
|
||||
logger.error(f"Health check error: {e}")
|
||||
|
||||
async def main():
|
||||
"""Main server loop with graceful shutdown."""
|
||||
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
|
||||
@@ -98,26 +147,21 @@ async def main():
|
||||
# Signal handlers not supported on Windows
|
||||
pass
|
||||
|
||||
# Start WebSocket server
|
||||
async with websockets.serve(broadcast_handler, HOST, PORT):
|
||||
logger.info("Gateway is ready and listening.")
|
||||
await stop
|
||||
logger.info(f"Nexus WS gateway running on ws://{HOST}:{PORT}")
|
||||
logger.info(f"Heartbeat interval: {heartbeat_manager.heartbeat_interval}s")
|
||||
logger.info(f"Max missed heartbeats: {heartbeat_manager.max_missed}")
|
||||
|
||||
logger.info("Shutting down Nexus WS gateway...")
|
||||
# Close any remaining client connections (handlers may have already cleaned up)
|
||||
remaining = {c for c in clients if c.open}
|
||||
if remaining:
|
||||
logger.info(f"Closing {len(remaining)} active connections...")
|
||||
close_tasks = [client.close() for client in remaining]
|
||||
await asyncio.gather(*close_tasks, return_exceptions=True)
|
||||
clients.clear()
|
||||
|
||||
logger.info("Shutdown complete.")
|
||||
# Wait for shutdown signal
|
||||
await stop
|
||||
logger.info("Shutting down Nexus WS gateway...")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
logger.info("Server stopped by user")
|
||||
except Exception as e:
|
||||
logger.critical(f"Fatal server error: {e}")
|
||||
sys.exit(1)
|
||||
logger.error(f"Server error: {e}")
|
||||
sys.exit(1)
|
||||
172
tests/test_heartbeat.py
Normal file
172
tests/test_heartbeat.py
Normal file
@@ -0,0 +1,172 @@
|
||||
"""
|
||||
Tests for WebSocket heartbeat functionality
|
||||
Issue #1535: feat: WebSocket heartbeat with auto-reconnect from client
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
# Add parent directory to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from nexus.heartbeat_manager import HeartbeatManager, HeartbeatMessage, HeartbeatType
|
||||
|
||||
@pytest.fixture
|
||||
def heartbeat_manager():
|
||||
"""Create a fresh heartbeat manager for each test."""
|
||||
return HeartbeatManager(heartbeat_interval=1.0, max_missed=2)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_websocket():
|
||||
"""Create a mock WebSocket connection."""
|
||||
ws = AsyncMock()
|
||||
ws.send = AsyncMock()
|
||||
ws.close = AsyncMock()
|
||||
ws.remote_address = ('127.0.0.1', 12345)
|
||||
return ws
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_client(heartbeat_manager, mock_websocket):
|
||||
"""Test client registration."""
|
||||
await heartbeat_manager.register_client(mock_websocket, 'test_user', {'x': 1, 'y': 2, 'z': 3})
|
||||
|
||||
assert mock_websocket in heartbeat_manager.clients
|
||||
assert heartbeat_manager.clients[mock_websocket]['user_id'] == 'test_user'
|
||||
assert heartbeat_manager.clients[mock_websocket]['position'] == {'x': 1, 'y': 2, 'z': 3}
|
||||
assert mock_websocket in heartbeat_manager.heartbeat_tasks
|
||||
assert heartbeat_manager.get_user_count() == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unregister_client(heartbeat_manager, mock_websocket):
|
||||
"""Test client unregistration."""
|
||||
await heartbeat_manager.register_client(mock_websocket, 'test_user')
|
||||
await heartbeat_manager.unregister_client(mock_websocket)
|
||||
|
||||
assert mock_websocket not in heartbeat_manager.clients
|
||||
assert mock_websocket not in heartbeat_manager.heartbeat_tasks
|
||||
assert heartbeat_manager.get_user_count() == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_heartbeat(heartbeat_manager, mock_websocket):
|
||||
"""Test handling heartbeat ping."""
|
||||
await heartbeat_manager.register_client(mock_websocket, 'test_user')
|
||||
|
||||
# Send heartbeat
|
||||
heartbeat_msg = json.dumps({
|
||||
'type': 'heartbeat',
|
||||
'timestamp': 1234567890,
|
||||
'user_id': 'test_user',
|
||||
'position': {'x': 5, 'y': 10, 'z': 15}
|
||||
})
|
||||
|
||||
await heartbeat_manager.handle_message(mock_websocket, heartbeat_msg)
|
||||
|
||||
# Check that pong was sent
|
||||
mock_websocket.send.assert_called()
|
||||
sent_data = json.loads(mock_websocket.send.call_args[0][0])
|
||||
assert sent_data['type'] == 'pong'
|
||||
assert sent_data['user_count'] == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_reconnect(heartbeat_manager, mock_websocket):
|
||||
"""Test handling reconnect message."""
|
||||
await heartbeat_manager.register_client(mock_websocket, 'old_user')
|
||||
|
||||
# Send reconnect with new user ID
|
||||
reconnect_msg = json.dumps({
|
||||
'type': 'reconnect',
|
||||
'timestamp': 1234567890,
|
||||
'user_id': 'new_user',
|
||||
'position': {'x': 10, 'y': 20, 'z': 30}
|
||||
})
|
||||
|
||||
await heartbeat_manager.handle_message(mock_websocket, reconnect_msg)
|
||||
|
||||
# Check that user ID was updated
|
||||
assert heartbeat_manager.clients[mock_websocket]['user_id'] == 'new_user'
|
||||
assert heartbeat_manager.clients[mock_websocket]['position'] == {'x': 10, 'y': 20, 'z': 30}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_message_serialization():
|
||||
"""Test HeartbeatMessage serialization."""
|
||||
msg = HeartbeatMessage(
|
||||
type=HeartbeatType.PONG,
|
||||
timestamp=1234567890.123,
|
||||
user_count=5
|
||||
)
|
||||
|
||||
json_str = msg.to_json()
|
||||
data = json.loads(json_str)
|
||||
|
||||
assert data['type'] == 'pong'
|
||||
assert data['timestamp'] == 1234567890.123
|
||||
assert data['user_count'] == 5
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_message_deserialization():
|
||||
"""Test HeartbeatMessage deserialization."""
|
||||
json_str = '{"type": "ping", "timestamp": 1234567890.123, "user_id": "test"}'
|
||||
msg = HeartbeatMessage.from_json(json_str)
|
||||
|
||||
assert msg.type == HeartbeatType.PING
|
||||
assert msg.timestamp == 1234567890.123
|
||||
assert msg.user_id == 'test'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_client_info(heartbeat_manager, mock_websocket):
|
||||
"""Test getting client info."""
|
||||
await heartbeat_manager.register_client(mock_websocket, 'test_user', {'x': 1, 'y': 2, 'z': 3})
|
||||
|
||||
info = heartbeat_manager.get_client_info(mock_websocket)
|
||||
assert info is not None
|
||||
assert info['user_id'] == 'test_user'
|
||||
assert info['position'] == {'x': 1, 'y': 2, 'z': 3}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_all_clients(heartbeat_manager, mock_websocket):
|
||||
"""Test getting all clients."""
|
||||
await heartbeat_manager.register_client(mock_websocket, 'test_user')
|
||||
|
||||
all_clients = heartbeat_manager.get_all_clients()
|
||||
assert len(all_clients) == 1
|
||||
assert mock_websocket in all_clients
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_user_position(heartbeat_manager, mock_websocket):
|
||||
"""Test updating user position."""
|
||||
await heartbeat_manager.register_client(mock_websocket, 'test_user', {'x': 0, 'y': 0, 'z': 0})
|
||||
|
||||
heartbeat_manager.update_user_position(mock_websocket, {'x': 10, 'y': 20, 'z': 30})
|
||||
|
||||
assert heartbeat_manager.clients[mock_websocket]['position'] == {'x': 10, 'y': 20, 'z': 30}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_clients(heartbeat_manager):
|
||||
"""Test handling multiple clients."""
|
||||
ws1 = AsyncMock()
|
||||
ws1.send = AsyncMock()
|
||||
ws1.remote_address = ('127.0.0.1', 12345)
|
||||
|
||||
ws2 = AsyncMock()
|
||||
ws2.send = AsyncMock()
|
||||
ws2.remote_address = ('127.0.0.1', 12346)
|
||||
|
||||
await heartbeat_manager.register_client(ws1, 'user1')
|
||||
await heartbeat_manager.register_client(ws2, 'user2')
|
||||
|
||||
assert heartbeat_manager.get_user_count() == 2
|
||||
|
||||
# Send heartbeat from first client
|
||||
heartbeat_msg = json.dumps({'type': 'heartbeat', 'timestamp': 1234567890})
|
||||
await heartbeat_manager.handle_message(ws1, heartbeat_msg)
|
||||
|
||||
# Both clients should receive pong
|
||||
ws1.send.assert_called()
|
||||
ws2.send.assert_called()
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
Reference in New Issue
Block a user