feat: add thin Evennia to Nexus event adapter
Some checks failed
CI / validate (pull_request) Failing after 4s

This commit is contained in:
Alexander Whitestone
2026-03-28 16:02:27 -04:00
parent 8313533304
commit 8ead4cd13f
5 changed files with 287 additions and 1 deletions

View File

@@ -14,7 +14,11 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.nexus_think import NexusMind
try:
from nexus.nexus_think import NexusMind
except Exception:
NexusMind = None
__all__ = [
"ws_to_perception",

View File

@@ -0,0 +1,66 @@
"""Thin Evennia -> Nexus event normalization helpers."""
from __future__ import annotations
from datetime import datetime, timezone
def _ts(value: str | None = None) -> str:
return value or datetime.now(timezone.utc).isoformat()
def session_bound(hermes_session_id: str, evennia_account: str = "Timmy", evennia_character: str = "Timmy", timestamp: str | None = None) -> dict:
return {
"type": "evennia.session_bound",
"hermes_session_id": hermes_session_id,
"evennia_account": evennia_account,
"evennia_character": evennia_character,
"timestamp": _ts(timestamp),
}
def actor_located(actor_id: str, room_key: str, room_name: str | None = None, timestamp: str | None = None) -> dict:
return {
"type": "evennia.actor_located",
"actor_id": actor_id,
"room_id": room_key,
"room_key": room_key,
"room_name": room_name or room_key,
"timestamp": _ts(timestamp),
}
def room_snapshot(room_key: str, title: str, desc: str, exits: list[dict] | None = None, objects: list[dict] | None = None, occupants: list[dict] | None = None, timestamp: str | None = None) -> dict:
return {
"type": "evennia.room_snapshot",
"room_id": room_key,
"room_key": room_key,
"title": title,
"desc": desc,
"exits": exits or [],
"objects": objects or [],
"occupants": occupants or [],
"timestamp": _ts(timestamp),
}
def command_issued(hermes_session_id: str, actor_id: str, command_text: str, timestamp: str | None = None) -> dict:
return {
"type": "evennia.command_issued",
"hermes_session_id": hermes_session_id,
"actor_id": actor_id,
"command_text": command_text,
"timestamp": _ts(timestamp),
}
def command_result(hermes_session_id: str, actor_id: str, command_text: str, output_text: str, success: bool = True, timestamp: str | None = None) -> dict:
return {
"type": "evennia.command_result",
"hermes_session_id": hermes_session_id,
"actor_id": actor_id,
"command_text": command_text,
"output_text": output_text,
"success": success,
"timestamp": _ts(timestamp),
}

View File

@@ -199,6 +199,56 @@ def perceive_action_result(data: dict) -> Optional[Perception]:
)
def perceive_evennia_actor_located(data: dict) -> Optional[Perception]:
actor = data.get("actor_id", "Timmy")
room = data.get("room_name") or data.get("room_key") or data.get("room_id")
if not room:
return None
return Perception(
timestamp=time.time(),
raw_type="evennia.actor_located",
description=f"{actor} is now in {room}.",
salience=0.7,
)
def perceive_evennia_room_snapshot(data: dict) -> Optional[Perception]:
title = data.get("title") or data.get("room_key") or data.get("room_id")
desc = data.get("desc", "")
exits = ", ".join(exit.get("key", "") for exit in data.get("exits", []) if exit.get("key"))
objects = ", ".join(obj.get("key", "") for obj in data.get("objects", []) if obj.get("key"))
if not title:
return None
parts = [f"You are in {title}."]
if desc:
parts.append(desc)
if exits:
parts.append(f"Exits: {exits}.")
if objects:
parts.append(f"You see: {objects}.")
return Perception(
timestamp=time.time(),
raw_type="evennia.room_snapshot",
description=" ".join(parts),
salience=0.85,
)
def perceive_evennia_command_result(data: dict) -> Optional[Perception]:
success = data.get("success", True)
command = data.get("command_text", "your command")
output = data.get("output_text", "")
desc = f"Your world command {'succeeded' if success else 'failed'}: {command}."
if output:
desc += f" {output[:240]}"
return Perception(
timestamp=time.time(),
raw_type="evennia.command_result",
description=desc,
salience=0.8,
)
# Registry of WS type → perception function
PERCEPTION_MAP = {
"agent_state": perceive_agent_state,
@@ -212,6 +262,9 @@ PERCEPTION_MAP = {
"action_result": perceive_action_result,
"heartbeat": lambda _: None, # Ignore
"dual_brain": lambda _: None, # Internal — not part of sensorium
"evennia.actor_located": perceive_evennia_actor_located,
"evennia.room_snapshot": perceive_evennia_room_snapshot,
"evennia.command_result": perceive_evennia_command_result,
}