#!/usr/bin/env python3 """ Live Evennia -> Nexus WebSocket bridge. Two modes: 1. Live tail: watches Evennia log files and streams parsed events to Nexus WS 2. Playback: replays a telemetry JSONL file (legacy mode) The bridge auto-reconnects on both ends and survives Evennia restarts. """ from __future__ import annotations import argparse import asyncio import json import os import re import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional try: import websockets except ImportError: websockets = None from nexus.evennia_event_adapter import ( audit_heartbeat, command_executed, player_join, player_leave, player_move, ) ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") # Regex patterns for log parsing MOVE_RE = re.compile(r"AUDIT MOVE: (\w+) arrived at (.+?) from (.+)") CMD_RE = re.compile(r"AUDIT CMD: (\w+) executed '(\w+)'(?: args: '(.*?)')?") SESSION_START_RE = re.compile(r"AUDIT SESSION: (\w+) puppeted by (\w+)") SESSION_END_RE = re.compile(r"AUDIT SESSION: (\w+) unpuppeted.*session (\d+)s") LOGIN_RE = re.compile(r"Logged in: (\w+)\(account \d+\) ([\d.]+)") LOGOUT_RE = re.compile(r"Logged out: (\w+)\(account \d+\) ([\d.]+)") def strip_ansi(text: str) -> str: return ANSI_RE.sub("", text or "") class LogTailer: """Async file tailer that yields new lines as they appear.""" def __init__(self, path: str, poll_interval: float = 0.5): self.path = path self.poll_interval = poll_interval self._offset = 0 async def tail(self): """Yield new lines from the file, starting from end.""" # Start at end of file if os.path.exists(self.path): self._offset = os.path.getsize(self.path) while True: try: if not os.path.exists(self.path): await asyncio.sleep(self.poll_interval) continue size = os.path.getsize(self.path) if size < self._offset: # File was truncated/rotated self._offset = 0 if size > self._offset: with open(self.path, "r") as f: f.seek(self._offset) for line in f: line = line.strip() if line: yield line self._offset = f.tell() await asyncio.sleep(self.poll_interval) except Exception as e: print(f"[tailer] Error reading {self.path}: {e}", flush=True) await asyncio.sleep(2) def parse_log_line(line: str) -> Optional[dict]: """Parse a log line into a Nexus event, or None if not parseable.""" # Movement events m = MOVE_RE.search(line) if m: return player_move(m.group(1), m.group(3), m.group(2)) # Command events m = CMD_RE.search(line) if m: return command_executed(m.group(1), m.group(2), m.group(3) or "") # Session start m = SESSION_START_RE.search(line) if m: return player_join(m.group(2), m.group(1)) # Session end m = SESSION_END_RE.search(line) if m: return player_leave("", m.group(1), session_duration=float(m.group(2))) # Server login m = LOGIN_RE.search(line) if m: return player_join(m.group(1), ip_address=m.group(2)) # Server logout m = LOGOUT_RE.search(line) if m: return player_leave(m.group(1)) return None async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0): """ Main live bridge loop. Tails all Evennia log files and streams parsed events to Nexus WebSocket. Auto-reconnects on failure. """ log_files = [ os.path.join(log_dir, "command_audit.log"), os.path.join(log_dir, "movement_audit.log"), os.path.join(log_dir, "player_activity.log"), os.path.join(log_dir, "server.log"), ] event_queue: asyncio.Queue = asyncio.Queue(maxsize=10000) async def tail_file(path: str): """Tail a single file and put events on queue.""" tailer = LogTailer(path) async for line in tailer.tail(): event = parse_log_line(line) if event: try: event_queue.put_nowait(event) except asyncio.QueueFull: pass # Drop oldest if queue full async def ws_sender(): """Send events from queue to WebSocket, with auto-reconnect.""" while True: try: if websockets is None: print("[bridge] websockets not installed, logging events locally", flush=True) while True: event = await event_queue.get() ts = event.get("timestamp", "")[:19] print(f"[{ts}] {event['type']}: {json.dumps({k: v for k, v in event.items() if k not in ('type', 'timestamp')})}", flush=True) print(f"[bridge] Connecting to {ws_url}...", flush=True) async with websockets.connect(ws_url) as ws: print(f"[bridge] Connected to Nexus at {ws_url}", flush=True) while True: event = await event_queue.get() await ws.send(json.dumps(event)) except Exception as e: print(f"[bridge] WebSocket error: {e}. Reconnecting in {reconnect_delay}s...", flush=True) await asyncio.sleep(reconnect_delay) # Start all tailers + sender tasks = [asyncio.create_task(tail_file(f)) for f in log_files] tasks.append(asyncio.create_task(ws_sender())) print(f"[bridge] Live bridge started. Watching {len(log_files)} log files.", flush=True) await asyncio.gather(*tasks) async def playback(log_path: Path, ws_url: str): """Legacy mode: replay a telemetry JSONL file.""" from nexus.evennia_event_adapter import ( actor_located, command_issued, command_result, room_snapshot, session_bound, ) def clean_lines(text: str) -> list[str]: text = strip_ansi(text).replace("\r", "") return [line.strip() for line in text.split("\n") if line.strip()] def parse_room_output(text: str): lines = clean_lines(text) if len(lines) < 2: return None title = lines[0] desc = lines[1] exits = [] objects = [] for line in lines[2:]: if line.startswith("Exits:"): raw = line.split(":", 1)[1].strip().replace(" and ", ", ") exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()] elif line.startswith("You see:"): raw = line.split(":", 1)[1].strip().replace(" and ", ", ") parts = [t.strip() for t in raw.split(",") if t.strip()] objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts] return {"title": title, "desc": desc, "exits": exits, "objects": objects} def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]: out = [] event = raw.get("event") actor = raw.get("actor", "Timmy") timestamp = raw.get("timestamp") if event == "connect": out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp)) parsed = parse_room_output(raw.get("output", "")) if parsed: out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp)) out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp)) elif event == "command": cmd = raw.get("command", "") output = raw.get("output", "") out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp)) success = not output.startswith("Command '") and not output.startswith("Could not find") out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp)) parsed = parse_room_output(output) if parsed: out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp)) out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp)) return out hermes_session_id = log_path.stem async with websockets.connect(ws_url) as ws: for line in log_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue raw = json.loads(line) for event in normalize_event(raw, hermes_session_id): await ws.send(json.dumps(event)) def main(): parser = argparse.ArgumentParser(description="Evennia -> Nexus WebSocket Bridge") sub = parser.add_subparsers(dest="mode") live = sub.add_parser("live", help="Live tail Evennia logs and stream to Nexus") live.add_argument("--log-dir", default="/root/workspace/timmy-academy/server/logs", help="Evennia logs directory") live.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus WebSocket URL") replay = sub.add_parser("playback", help="Replay a telemetry JSONL file") replay.add_argument("log_path", help="Path to Evennia telemetry JSONL") replay.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus WebSocket URL") args = parser.parse_args() if args.mode == "live": asyncio.run(live_bridge(args.log_dir, args.ws)) elif args.mode == "playback": asyncio.run(playback(Path(args.log_path).expanduser(), args.ws)) else: parser.print_help() if __name__ == "__main__": main()