diff --git a/bin/__pycache__/nexus_watchdog.cpython-312.pyc b/bin/__pycache__/nexus_watchdog.cpython-312.pyc new file mode 100644 index 0000000..20fd6b1 Binary files /dev/null and b/bin/__pycache__/nexus_watchdog.cpython-312.pyc differ diff --git a/nexus/evennia_event_adapter.py b/nexus/evennia_event_adapter.py index e7dffd9..3bcd16f 100644 --- a/nexus/evennia_event_adapter.py +++ b/nexus/evennia_event_adapter.py @@ -1,4 +1,4 @@ -"""Thin Evennia -> Nexus event normalization helpers.""" +"""Evennia -> Nexus event normalization — v2 with full audit event types.""" from __future__ import annotations @@ -9,6 +9,29 @@ def _ts(value: str | None = None) -> str: return value or datetime.now(timezone.utc).isoformat() +# ── Session Events ────────────────────────────────────────── + +def player_join(account: str, character: str = "", ip_address: str = "", timestamp: str | None = None) -> dict: + return { + "type": "evennia.player_join", + "account": account, + "character": character, + "ip_address": ip_address, + "timestamp": _ts(timestamp), + } + + +def player_leave(account: str, character: str = "", reason: str = "quit", session_duration: float = 0, timestamp: str | None = None) -> dict: + return { + "type": "evennia.player_leave", + "account": account, + "character": character, + "reason": reason, + "session_duration_seconds": session_duration, + "timestamp": _ts(timestamp), + } + + def session_bound(hermes_session_id: str, evennia_account: str = "Timmy", evennia_character: str = "Timmy", timestamp: str | None = None) -> dict: return { "type": "evennia.session_bound", @@ -19,6 +42,18 @@ def session_bound(hermes_session_id: str, evennia_account: str = "Timmy", evenni } +# ── Movement Events ───────────────────────────────────────── + +def player_move(character: str, from_room: str, to_room: str, timestamp: str | None = None) -> dict: + return { + "type": "evennia.player_move", + "character": character, + "from_room": from_room, + "to_room": to_room, + "timestamp": _ts(timestamp), + } + + def actor_located(actor_id: str, room_key: str, room_name: str | None = None, timestamp: str | None = None) -> dict: return { "type": "evennia.actor_located", @@ -44,6 +79,19 @@ def room_snapshot(room_key: str, title: str, desc: str, exits: list[dict] | None } +# ── Command Events ────────────────────────────────────────── + +def command_executed(character: str, command: str, args: str = "", success: bool = True, timestamp: str | None = None) -> dict: + return { + "type": "evennia.command_executed", + "character": character, + "command": command, + "args": args, + "success": success, + "timestamp": _ts(timestamp), + } + + def command_issued(hermes_session_id: str, actor_id: str, command_text: str, timestamp: str | None = None) -> dict: return { "type": "evennia.command_issued", @@ -64,3 +112,16 @@ def command_result(hermes_session_id: str, actor_id: str, command_text: str, out "success": success, "timestamp": _ts(timestamp), } + + +# ── Audit Summary ─────────────────────────────────────────── + +def audit_heartbeat(characters: list[dict], online_count: int, total_commands: int, total_movements: int, timestamp: str | None = None) -> dict: + return { + "type": "evennia.audit_heartbeat", + "characters": characters, + "online_count": online_count, + "total_commands": total_commands, + "total_movements": total_movements, + "timestamp": _ts(timestamp), + } diff --git a/nexus/evennia_ws_bridge.py b/nexus/evennia_ws_bridge.py index a86140b..3820d37 100644 --- a/nexus/evennia_ws_bridge.py +++ b/nexus/evennia_ws_bridge.py @@ -1,82 +1,238 @@ #!/usr/bin/env python3 -"""Publish Evennia telemetry logs into the Nexus websocket bridge.""" +""" +Live Evennia -> Nexus WebSocket bridge. + +Two modes: +1. Live tail: watches Evennia log files and streams parsed events to Nexus WS +2. Playback: replays a telemetry JSONL file (legacy mode) + +The bridge auto-reconnects on both ends and survives Evennia restarts. +""" from __future__ import annotations import argparse import asyncio import json +import os import re +import sys +import time +from datetime import datetime, timezone from pathlib import Path -from typing import Iterable +from typing import Optional -import websockets +try: + import websockets +except ImportError: + websockets = None -from nexus.evennia_event_adapter import actor_located, command_issued, command_result, room_snapshot, session_bound +from nexus.evennia_event_adapter import ( + audit_heartbeat, + command_executed, + player_join, + player_leave, + player_move, +) ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") +# Regex patterns for log parsing +MOVE_RE = re.compile(r"AUDIT MOVE: (\w+) arrived at (.+?) from (.+)") +CMD_RE = re.compile(r"AUDIT CMD: (\w+) executed '(\w+)'(?: args: '(.*?)')?") +SESSION_START_RE = re.compile(r"AUDIT SESSION: (\w+) puppeted by (\w+)") +SESSION_END_RE = re.compile(r"AUDIT SESSION: (\w+) unpuppeted.*session (\d+)s") +LOGIN_RE = re.compile(r"Logged in: (\w+)\(account \d+\) ([\d.]+)") +LOGOUT_RE = re.compile(r"Logged out: (\w+)\(account \d+\) ([\d.]+)") def strip_ansi(text: str) -> str: return ANSI_RE.sub("", text or "") -def clean_lines(text: str) -> list[str]: - text = strip_ansi(text).replace("\r", "") - return [line.strip() for line in text.split("\n") if line.strip()] +class LogTailer: + """Async file tailer that yields new lines as they appear.""" + + def __init__(self, path: str, poll_interval: float = 0.5): + self.path = path + self.poll_interval = poll_interval + self._offset = 0 + + async def tail(self): + """Yield new lines from the file, starting from end.""" + # Start at end of file + if os.path.exists(self.path): + self._offset = os.path.getsize(self.path) + + while True: + try: + if not os.path.exists(self.path): + await asyncio.sleep(self.poll_interval) + continue + + size = os.path.getsize(self.path) + if size < self._offset: + # File was truncated/rotated + self._offset = 0 + + if size > self._offset: + with open(self.path, "r") as f: + f.seek(self._offset) + for line in f: + line = line.strip() + if line: + yield line + self._offset = f.tell() + + await asyncio.sleep(self.poll_interval) + except Exception as e: + print(f"[tailer] Error reading {self.path}: {e}", flush=True) + await asyncio.sleep(2) -def parse_room_output(text: str): - lines = clean_lines(text) - if len(lines) < 2: - return None - title = lines[0] - desc = lines[1] - exits = [] - objects = [] - for line in lines[2:]: - if line.startswith("Exits:"): - raw = line.split(":", 1)[1].strip() - raw = raw.replace(" and ", ", ") - exits = [{"key": token.strip(), "destination_id": token.strip().title(), "destination_key": token.strip().title()} for token in raw.split(",") if token.strip()] - elif line.startswith("You see:"): - raw = line.split(":", 1)[1].strip() - raw = raw.replace(" and ", ", ") - parts = [token.strip() for token in raw.split(",") if token.strip()] - objects = [{"id": p.removeprefix('a ').removeprefix('an '), "key": p.removeprefix('a ').removeprefix('an '), "short_desc": p} for p in parts] - return {"title": title, "desc": desc, "exits": exits, "objects": objects} +def parse_log_line(line: str) -> Optional[dict]: + """Parse a log line into a Nexus event, or None if not parseable.""" + + # Movement events + m = MOVE_RE.search(line) + if m: + return player_move(m.group(1), m.group(3), m.group(2)) + + # Command events + m = CMD_RE.search(line) + if m: + return command_executed(m.group(1), m.group(2), m.group(3) or "") + + # Session start + m = SESSION_START_RE.search(line) + if m: + return player_join(m.group(2), m.group(1)) + + # Session end + m = SESSION_END_RE.search(line) + if m: + return player_leave("", m.group(1), session_duration=float(m.group(2))) + + # Server login + m = LOGIN_RE.search(line) + if m: + return player_join(m.group(1), ip_address=m.group(2)) + + # Server logout + m = LOGOUT_RE.search(line) + if m: + return player_leave(m.group(1)) + + return None -def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]: - out: list[dict] = [] - event = raw.get("event") - actor = raw.get("actor", "Timmy") - timestamp = raw.get("timestamp") - - if event == "connect": - out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp)) - parsed = parse_room_output(raw.get("output", "")) - if parsed: - out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp)) - out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp)) - return out - - if event == "command": - cmd = raw.get("command", "") - output = raw.get("output", "") - out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp)) - success = not output.startswith("Command '") and not output.startswith("Could not find") - out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp)) - parsed = parse_room_output(output) - if parsed: - out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp)) - out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp)) - return out - - return out +async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0): + """ + Main live bridge loop. + + Tails all Evennia log files and streams parsed events to Nexus WebSocket. + Auto-reconnects on failure. + """ + log_files = [ + os.path.join(log_dir, "command_audit.log"), + os.path.join(log_dir, "movement_audit.log"), + os.path.join(log_dir, "player_activity.log"), + os.path.join(log_dir, "server.log"), + ] + + event_queue: asyncio.Queue = asyncio.Queue(maxsize=10000) + + async def tail_file(path: str): + """Tail a single file and put events on queue.""" + tailer = LogTailer(path) + async for line in tailer.tail(): + event = parse_log_line(line) + if event: + try: + event_queue.put_nowait(event) + except asyncio.QueueFull: + pass # Drop oldest if queue full + + async def ws_sender(): + """Send events from queue to WebSocket, with auto-reconnect.""" + while True: + try: + if websockets is None: + print("[bridge] websockets not installed, logging events locally", flush=True) + while True: + event = await event_queue.get() + ts = event.get("timestamp", "")[:19] + print(f"[{ts}] {event['type']}: {json.dumps({k: v for k, v in event.items() if k not in ('type', 'timestamp')})}", flush=True) + + print(f"[bridge] Connecting to {ws_url}...", flush=True) + async with websockets.connect(ws_url) as ws: + print(f"[bridge] Connected to Nexus at {ws_url}", flush=True) + while True: + event = await event_queue.get() + await ws.send(json.dumps(event)) + except Exception as e: + print(f"[bridge] WebSocket error: {e}. Reconnecting in {reconnect_delay}s...", flush=True) + await asyncio.sleep(reconnect_delay) + + # Start all tailers + sender + tasks = [asyncio.create_task(tail_file(f)) for f in log_files] + tasks.append(asyncio.create_task(ws_sender())) + + print(f"[bridge] Live bridge started. Watching {len(log_files)} log files.", flush=True) + await asyncio.gather(*tasks) async def playback(log_path: Path, ws_url: str): + """Legacy mode: replay a telemetry JSONL file.""" + from nexus.evennia_event_adapter import ( + actor_located, command_issued, command_result, + room_snapshot, session_bound, + ) + + def clean_lines(text: str) -> list[str]: + text = strip_ansi(text).replace("\r", "") + return [line.strip() for line in text.split("\n") if line.strip()] + + def parse_room_output(text: str): + lines = clean_lines(text) + if len(lines) < 2: + return None + title = lines[0] + desc = lines[1] + exits = [] + objects = [] + for line in lines[2:]: + if line.startswith("Exits:"): + raw = line.split(":", 1)[1].strip().replace(" and ", ", ") + exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()] + elif line.startswith("You see:"): + raw = line.split(":", 1)[1].strip().replace(" and ", ", ") + parts = [t.strip() for t in raw.split(",") if t.strip()] + objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts] + return {"title": title, "desc": desc, "exits": exits, "objects": objects} + + def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]: + out = [] + event = raw.get("event") + actor = raw.get("actor", "Timmy") + timestamp = raw.get("timestamp") + if event == "connect": + out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp)) + parsed = parse_room_output(raw.get("output", "")) + if parsed: + out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp)) + out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp)) + elif event == "command": + cmd = raw.get("command", "") + output = raw.get("output", "") + out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp)) + success = not output.startswith("Command '") and not output.startswith("Could not find") + out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp)) + parsed = parse_room_output(output) + if parsed: + out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp)) + out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp)) + return out + hermes_session_id = log_path.stem async with websockets.connect(ws_url) as ws: for line in log_path.read_text(encoding="utf-8").splitlines(): @@ -88,11 +244,25 @@ async def playback(log_path: Path, ws_url: str): def main(): - parser = argparse.ArgumentParser(description="Publish Evennia telemetry into the Nexus websocket bridge") - parser.add_argument("log_path", help="Path to Evennia telemetry JSONL") - parser.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus websocket bridge URL") + parser = argparse.ArgumentParser(description="Evennia -> Nexus WebSocket Bridge") + sub = parser.add_subparsers(dest="mode") + + live = sub.add_parser("live", help="Live tail Evennia logs and stream to Nexus") + live.add_argument("--log-dir", default="/root/workspace/timmy-academy/server/logs", help="Evennia logs directory") + live.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus WebSocket URL") + + replay = sub.add_parser("playback", help="Replay a telemetry JSONL file") + replay.add_argument("log_path", help="Path to Evennia telemetry JSONL") + replay.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus WebSocket URL") + args = parser.parse_args() - asyncio.run(playback(Path(args.log_path).expanduser(), args.ws)) + + if args.mode == "live": + asyncio.run(live_bridge(args.log_dir, args.ws)) + elif args.mode == "playback": + asyncio.run(playback(Path(args.log_path).expanduser(), args.ws)) + else: + parser.print_help() if __name__ == "__main__":