feat: Evennia-Nexus Bridge v2 — Live Event Streaming (#804) (#807)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled

Co-authored-by: Allegro <allegro@hermes.local>
Co-committed-by: Allegro <allegro@hermes.local>
This commit was merged in pull request #807.
This commit is contained in:
2026-04-04 01:39:38 +00:00
committed by Claude (Opus 4.6)
parent 211ea1178d
commit 8645798ed4
3 changed files with 289 additions and 58 deletions

Binary file not shown.

View File

@@ -1,4 +1,4 @@
"""Thin Evennia -> Nexus event normalization helpers.""" """Evennia -> Nexus event normalization — v2 with full audit event types."""
from __future__ import annotations from __future__ import annotations
@@ -9,6 +9,29 @@ def _ts(value: str | None = None) -> str:
return value or datetime.now(timezone.utc).isoformat() return value or datetime.now(timezone.utc).isoformat()
# ── Session Events ──────────────────────────────────────────
def player_join(account: str, character: str = "", ip_address: str = "", timestamp: str | None = None) -> dict:
return {
"type": "evennia.player_join",
"account": account,
"character": character,
"ip_address": ip_address,
"timestamp": _ts(timestamp),
}
def player_leave(account: str, character: str = "", reason: str = "quit", session_duration: float = 0, timestamp: str | None = None) -> dict:
return {
"type": "evennia.player_leave",
"account": account,
"character": character,
"reason": reason,
"session_duration_seconds": session_duration,
"timestamp": _ts(timestamp),
}
def session_bound(hermes_session_id: str, evennia_account: str = "Timmy", evennia_character: str = "Timmy", timestamp: str | None = None) -> dict: def session_bound(hermes_session_id: str, evennia_account: str = "Timmy", evennia_character: str = "Timmy", timestamp: str | None = None) -> dict:
return { return {
"type": "evennia.session_bound", "type": "evennia.session_bound",
@@ -19,6 +42,18 @@ def session_bound(hermes_session_id: str, evennia_account: str = "Timmy", evenni
} }
# ── Movement Events ─────────────────────────────────────────
def player_move(character: str, from_room: str, to_room: str, timestamp: str | None = None) -> dict:
return {
"type": "evennia.player_move",
"character": character,
"from_room": from_room,
"to_room": to_room,
"timestamp": _ts(timestamp),
}
def actor_located(actor_id: str, room_key: str, room_name: str | None = None, timestamp: str | None = None) -> dict: def actor_located(actor_id: str, room_key: str, room_name: str | None = None, timestamp: str | None = None) -> dict:
return { return {
"type": "evennia.actor_located", "type": "evennia.actor_located",
@@ -44,6 +79,19 @@ def room_snapshot(room_key: str, title: str, desc: str, exits: list[dict] | None
} }
# ── Command Events ──────────────────────────────────────────
def command_executed(character: str, command: str, args: str = "", success: bool = True, timestamp: str | None = None) -> dict:
return {
"type": "evennia.command_executed",
"character": character,
"command": command,
"args": args,
"success": success,
"timestamp": _ts(timestamp),
}
def command_issued(hermes_session_id: str, actor_id: str, command_text: str, timestamp: str | None = None) -> dict: def command_issued(hermes_session_id: str, actor_id: str, command_text: str, timestamp: str | None = None) -> dict:
return { return {
"type": "evennia.command_issued", "type": "evennia.command_issued",
@@ -64,3 +112,16 @@ def command_result(hermes_session_id: str, actor_id: str, command_text: str, out
"success": success, "success": success,
"timestamp": _ts(timestamp), "timestamp": _ts(timestamp),
} }
# ── Audit Summary ───────────────────────────────────────────
def audit_heartbeat(characters: list[dict], online_count: int, total_commands: int, total_movements: int, timestamp: str | None = None) -> dict:
return {
"type": "evennia.audit_heartbeat",
"characters": characters,
"online_count": online_count,
"total_commands": total_commands,
"total_movements": total_movements,
"timestamp": _ts(timestamp),
}

View File

@@ -1,82 +1,238 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Publish Evennia telemetry logs into the Nexus websocket bridge.""" """
Live Evennia -> Nexus WebSocket bridge.
Two modes:
1. Live tail: watches Evennia log files and streams parsed events to Nexus WS
2. Playback: replays a telemetry JSONL file (legacy mode)
The bridge auto-reconnects on both ends and survives Evennia restarts.
"""
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import asyncio import asyncio
import json import json
import os
import re import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Iterable from typing import Optional
import websockets try:
import websockets
except ImportError:
websockets = None
from nexus.evennia_event_adapter import actor_located, command_issued, command_result, room_snapshot, session_bound from nexus.evennia_event_adapter import (
audit_heartbeat,
command_executed,
player_join,
player_leave,
player_move,
)
ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
# Regex patterns for log parsing
MOVE_RE = re.compile(r"AUDIT MOVE: (\w+) arrived at (.+?) from (.+)")
CMD_RE = re.compile(r"AUDIT CMD: (\w+) executed '(\w+)'(?: args: '(.*?)')?")
SESSION_START_RE = re.compile(r"AUDIT SESSION: (\w+) puppeted by (\w+)")
SESSION_END_RE = re.compile(r"AUDIT SESSION: (\w+) unpuppeted.*session (\d+)s")
LOGIN_RE = re.compile(r"Logged in: (\w+)\(account \d+\) ([\d.]+)")
LOGOUT_RE = re.compile(r"Logged out: (\w+)\(account \d+\) ([\d.]+)")
def strip_ansi(text: str) -> str: def strip_ansi(text: str) -> str:
return ANSI_RE.sub("", text or "") return ANSI_RE.sub("", text or "")
def clean_lines(text: str) -> list[str]: class LogTailer:
text = strip_ansi(text).replace("\r", "") """Async file tailer that yields new lines as they appear."""
return [line.strip() for line in text.split("\n") if line.strip()]
def __init__(self, path: str, poll_interval: float = 0.5):
self.path = path
self.poll_interval = poll_interval
self._offset = 0
async def tail(self):
"""Yield new lines from the file, starting from end."""
# Start at end of file
if os.path.exists(self.path):
self._offset = os.path.getsize(self.path)
while True:
try:
if not os.path.exists(self.path):
await asyncio.sleep(self.poll_interval)
continue
size = os.path.getsize(self.path)
if size < self._offset:
# File was truncated/rotated
self._offset = 0
if size > self._offset:
with open(self.path, "r") as f:
f.seek(self._offset)
for line in f:
line = line.strip()
if line:
yield line
self._offset = f.tell()
await asyncio.sleep(self.poll_interval)
except Exception as e:
print(f"[tailer] Error reading {self.path}: {e}", flush=True)
await asyncio.sleep(2)
def parse_room_output(text: str): def parse_log_line(line: str) -> Optional[dict]:
lines = clean_lines(text) """Parse a log line into a Nexus event, or None if not parseable."""
if len(lines) < 2:
return None # Movement events
title = lines[0] m = MOVE_RE.search(line)
desc = lines[1] if m:
exits = [] return player_move(m.group(1), m.group(3), m.group(2))
objects = []
for line in lines[2:]: # Command events
if line.startswith("Exits:"): m = CMD_RE.search(line)
raw = line.split(":", 1)[1].strip() if m:
raw = raw.replace(" and ", ", ") return command_executed(m.group(1), m.group(2), m.group(3) or "")
exits = [{"key": token.strip(), "destination_id": token.strip().title(), "destination_key": token.strip().title()} for token in raw.split(",") if token.strip()]
elif line.startswith("You see:"): # Session start
raw = line.split(":", 1)[1].strip() m = SESSION_START_RE.search(line)
raw = raw.replace(" and ", ", ") if m:
parts = [token.strip() for token in raw.split(",") if token.strip()] return player_join(m.group(2), m.group(1))
objects = [{"id": p.removeprefix('a ').removeprefix('an '), "key": p.removeprefix('a ').removeprefix('an '), "short_desc": p} for p in parts]
return {"title": title, "desc": desc, "exits": exits, "objects": objects} # Session end
m = SESSION_END_RE.search(line)
if m:
return player_leave("", m.group(1), session_duration=float(m.group(2)))
# Server login
m = LOGIN_RE.search(line)
if m:
return player_join(m.group(1), ip_address=m.group(2))
# Server logout
m = LOGOUT_RE.search(line)
if m:
return player_leave(m.group(1))
return None
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]: async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
out: list[dict] = [] """
event = raw.get("event") Main live bridge loop.
actor = raw.get("actor", "Timmy")
timestamp = raw.get("timestamp") Tails all Evennia log files and streams parsed events to Nexus WebSocket.
Auto-reconnects on failure.
if event == "connect": """
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp)) log_files = [
parsed = parse_room_output(raw.get("output", "")) os.path.join(log_dir, "command_audit.log"),
if parsed: os.path.join(log_dir, "movement_audit.log"),
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp)) os.path.join(log_dir, "player_activity.log"),
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp)) os.path.join(log_dir, "server.log"),
return out ]
if event == "command": event_queue: asyncio.Queue = asyncio.Queue(maxsize=10000)
cmd = raw.get("command", "")
output = raw.get("output", "") async def tail_file(path: str):
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp)) """Tail a single file and put events on queue."""
success = not output.startswith("Command '") and not output.startswith("Could not find") tailer = LogTailer(path)
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp)) async for line in tailer.tail():
parsed = parse_room_output(output) event = parse_log_line(line)
if parsed: if event:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp)) try:
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp)) event_queue.put_nowait(event)
return out except asyncio.QueueFull:
pass # Drop oldest if queue full
return out
async def ws_sender():
"""Send events from queue to WebSocket, with auto-reconnect."""
while True:
try:
if websockets is None:
print("[bridge] websockets not installed, logging events locally", flush=True)
while True:
event = await event_queue.get()
ts = event.get("timestamp", "")[:19]
print(f"[{ts}] {event['type']}: {json.dumps({k: v for k, v in event.items() if k not in ('type', 'timestamp')})}", flush=True)
print(f"[bridge] Connecting to {ws_url}...", flush=True)
async with websockets.connect(ws_url) as ws:
print(f"[bridge] Connected to Nexus at {ws_url}", flush=True)
while True:
event = await event_queue.get()
await ws.send(json.dumps(event))
except Exception as e:
print(f"[bridge] WebSocket error: {e}. Reconnecting in {reconnect_delay}s...", flush=True)
await asyncio.sleep(reconnect_delay)
# Start all tailers + sender
tasks = [asyncio.create_task(tail_file(f)) for f in log_files]
tasks.append(asyncio.create_task(ws_sender()))
print(f"[bridge] Live bridge started. Watching {len(log_files)} log files.", flush=True)
await asyncio.gather(*tasks)
async def playback(log_path: Path, ws_url: str): async def playback(log_path: Path, ws_url: str):
"""Legacy mode: replay a telemetry JSONL file."""
from nexus.evennia_event_adapter import (
actor_located, command_issued, command_result,
room_snapshot, session_bound,
)
def clean_lines(text: str) -> list[str]:
text = strip_ansi(text).replace("\r", "")
return [line.strip() for line in text.split("\n") if line.strip()]
def parse_room_output(text: str):
lines = clean_lines(text)
if len(lines) < 2:
return None
title = lines[0]
desc = lines[1]
exits = []
objects = []
for line in lines[2:]:
if line.startswith("Exits:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()]
elif line.startswith("You see:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
parts = [t.strip() for t in raw.split(",") if t.strip()]
objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts]
return {"title": title, "desc": desc, "exits": exits, "objects": objects}
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]:
out = []
event = raw.get("event")
actor = raw.get("actor", "Timmy")
timestamp = raw.get("timestamp")
if event == "connect":
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp))
parsed = parse_room_output(raw.get("output", ""))
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
elif event == "command":
cmd = raw.get("command", "")
output = raw.get("output", "")
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp))
success = not output.startswith("Command '") and not output.startswith("Could not find")
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp))
parsed = parse_room_output(output)
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
return out
hermes_session_id = log_path.stem hermes_session_id = log_path.stem
async with websockets.connect(ws_url) as ws: async with websockets.connect(ws_url) as ws:
for line in log_path.read_text(encoding="utf-8").splitlines(): for line in log_path.read_text(encoding="utf-8").splitlines():
@@ -88,11 +244,25 @@ async def playback(log_path: Path, ws_url: str):
def main(): def main():
parser = argparse.ArgumentParser(description="Publish Evennia telemetry into the Nexus websocket bridge") parser = argparse.ArgumentParser(description="Evennia -> Nexus WebSocket Bridge")
parser.add_argument("log_path", help="Path to Evennia telemetry JSONL") sub = parser.add_subparsers(dest="mode")
parser.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus websocket bridge URL")
live = sub.add_parser("live", help="Live tail Evennia logs and stream to Nexus")
live.add_argument("--log-dir", default="/root/workspace/timmy-academy/server/logs", help="Evennia logs directory")
live.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus WebSocket URL")
replay = sub.add_parser("playback", help="Replay a telemetry JSONL file")
replay.add_argument("log_path", help="Path to Evennia telemetry JSONL")
replay.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus WebSocket URL")
args = parser.parse_args() args = parser.parse_args()
asyncio.run(playback(Path(args.log_path).expanduser(), args.ws))
if args.mode == "live":
asyncio.run(live_bridge(args.log_dir, args.ws))
elif args.mode == "playback":
asyncio.run(playback(Path(args.log_path).expanduser(), args.ws))
else:
parser.print_help()
if __name__ == "__main__": if __name__ == "__main__":