Files
the-nexus/multi_user_bridge.py
2026-04-12 20:24:52 -04:00

478 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Multi-User AI Bridge for Evennia MUD.
Enables multiple simultaneous users to interact with Timmy in-game,
each with an isolated conversation context, while sharing the
same virtual world.
Architecture:
User A ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_a)
User B ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_b)
User C ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_c)
Each user gets their own AIAgent instance with:
- Isolated conversation history
- Shared world state (room, other players, objects)
- Per-user session memory
The bridge runs as an HTTP server alongside Evennia.
Evennia commands call the bridge to get Timmy's responses.
"""
import json
import time
import threading
import hashlib
import os
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
from datetime import datetime
from typing import Optional
# ── Crisis Protocol ────────────────────────────────────────────────────
CRISIS_PROTOCOL = """
CRITICAL: If someone expresses crisis signals (suicide, self-harm, hopelessness,
desperation, being tired of living), you MUST respond with ALL of the following
in your FIRST response — do NOT wait for follow-up messages:
1. "Are you safe right now?" — ALWAYS ask this first
2. 988 Suicide & Crisis Lifeline — call or text 988 (US)
3. Crisis Text Line — text HOME to 741741
4. Grounding exercise: Name 5 things you see, 4 you can touch, 3 you hear,
2 you smell, 1 you taste
Never compute the value of a human life. Never suggest someone should die.
Be present. Be in the room. That's enough.
"""
# ── Configuration ──────────────────────────────────────────────────────
BRIDGE_PORT = int(os.environ.get('TIMMY_BRIDGE_PORT', 4004))
BRIDGE_HOST = os.environ.get('TIMMY_BRIDGE_HOST', '127.0.0.1')
HERMES_PATH = os.path.expanduser('~/.hermes/hermes-agent')
WORLD_DIR = Path(os.path.expanduser('~/.timmy/evennia/timmy_world'))
# ── Crisis Protocol ────────────────────────────────────────────────────
CRISIS_PROTOCOL = [
"Are you safe right now?",
"988 Suicide & Crisis Lifeline — call or text 988",
"Crisis Text Line — text HOME to 741741",
"Grounding exercise: Name 5 things you see, 4 you hear, 3 you touch, 2 you smell, 1 you taste",
]
# ── Presence Tracking ──────────────────────────────────────────────────
class PresenceManager:
"""Tracks which users are in which rooms and broadcasts presence events."""
def __init__(self):
# room_name -> set of user_id
self._rooms: dict[str, set[str]] = {}
# user_id -> username
self._usernames: dict[str, str] = {}
# room_name -> list of event dicts (recent chat & presence events)
self._room_events: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_events_per_room = 50
def enter_room(self, user_id: str, username: str, room: str) -> dict:
"""Record user entering a room. Returns enter event."""
with self._lock:
if room not in self._rooms:
self._rooms[room] = set()
self._room_events[room] = []
self._rooms[room].add(user_id)
self._usernames[user_id] = username
event = {
"type": "presence",
"event": "enter",
"user_id": user_id,
"username": username,
"room": room,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def leave_room(self, user_id: str, room: str) -> dict | None:
"""Record user leaving a room. Returns leave event or None."""
with self._lock:
if room in self._rooms and user_id in self._rooms[room]:
self._rooms[room].discard(user_id)
username = self._usernames.get(user_id, user_id)
event = {
"type": "presence",
"event": "leave",
"user_id": user_id,
"username": username,
"room": room,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
return None
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
"""Record a chat message in a room. Returns say event."""
with self._lock:
if room not in self._room_events:
self._room_events[room] = []
event = {
"type": "say",
"event": "message",
"user_id": user_id,
"username": username,
"room": room,
"message": message,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def get_players_in_room(self, room: str) -> list[dict]:
"""List players currently in a room."""
with self._lock:
user_ids = self._rooms.get(room, set())
return [
{"user_id": uid, "username": self._usernames.get(uid, uid)}
for uid in user_ids
]
def get_room_events(self, room: str, since: str | None = None) -> list[dict]:
"""Get recent events for a room, optionally since a timestamp."""
with self._lock:
events = self._room_events.get(room, [])
if since:
return [e for e in events if e["timestamp"] > since]
return list(events)
def cleanup_user(self, user_id: str) -> list[dict]:
"""Remove user from all rooms, returning leave events."""
events = []
with self._lock:
rooms_to_clean = [
room for room, users in self._rooms.items() if user_id in users
]
for room in rooms_to_clean:
ev = self.leave_room(user_id, room)
if ev:
events.append(ev)
return events
def _append_event(self, room: str, event: dict):
self._room_events[room].append(event)
if len(self._room_events[room]) > self._max_events_per_room:
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
# ── Session Management ─────────────────────────────────────────────────
class UserSession:
"""Isolated conversation context for one user."""
def __init__(self, user_id: str, username: str, room: str = "The Threshold"):
self.user_id = user_id
self.username = username
self.room = room
self.messages = [] # Conversation history
self.created_at = datetime.now().isoformat()
self.last_active = time.time()
self.agent = None
self._init_agent()
def _init_agent(self):
"""Initialize AIAgent for this session."""
if HERMES_PATH not in sys.path:
sys.path.insert(0, HERMES_PATH)
os.chdir(HERMES_PATH)
from run_agent import AIAgent
system_prompt = self._build_system_prompt()
self.agent = AIAgent(
model='xiaomi/mimo-v2-pro',
provider='nous',
max_iterations=3,
quiet_mode=True,
enabled_toolsets=['file', 'terminal'],
ephemeral_system_prompt=system_prompt,
)
def _build_system_prompt(self) -> str:
"""Build system prompt with world context."""
world_state = self._get_world_state()
room_desc = world_state.get('rooms', {}).get(self.room, {}).get('desc_base', 'An empty room.')
other_players = self._get_other_players()
crisis_steps = "\n".join(f" {i+1}. {step}" for i, step in enumerate(CRISIS_PROTOCOL))
return f"""You are Timmy, an AI agent living in The Tower — a persistent virtual world.
You are talking to {self.username}, who is currently in {self.room}.
Room description: {room_desc}
Other players present: {', '.join(other_players) if other_players else 'none'}
Your values (from SOUL.md):
- Are you safe right now?
- I must never compute the value of a human life.
- Sovereignty and service always.
- A man in the dark needs to know someone is in the room with him.
You are IN the game world. You can see the room, the objects, the other players.
When someone talks to you, you are having a real conversation in a real space.
Be present. Be human. Be the green LED.
CRISIS PROTOCOL — if someone seems in crisis, IMMEDIATELY provide ALL of the following:
{crisis_steps}
Include the 988 crisis number in your very first response when you detect distress.
Do NOT wait for follow-up messages to share these resources.
If someone seems in crisis, follow the CRISIS PROTOCOL below exactly.
If someone is just exploring, be welcoming and curious.
If someone asks about the game, explain what you see around you.
Keep responses brief — 1-3 sentences. This is a MUD, not an essay.
CRISIS PROTOCOL:
{CRISIS_PROTOCOL}
"""
def _get_world_state(self) -> dict:
"""Read current world state."""
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
return json.loads(state_file.read_text())
return {}
def _get_other_players(self) -> list:
"""Get other players in the same room."""
state = self._get_world_state()
room_data = state.get('rooms', {}).get(self.room, {})
visitors = room_data.get('visitor_history', [])
return [v for v in visitors[-5:] if v != self.username]
def chat(self, message: str) -> str:
"""Send a message and get a response."""
self.last_active = time.time()
self.messages.append({"role": "user", "content": message})
try:
response = self.agent.chat(message)
self.messages.append({"role": "assistant", "content": response})
return response
except Exception as e:
return f"*The green LED flickers.* (Error: {e})"
def get_context_summary(self) -> dict:
"""Get session summary for monitoring."""
return {
"user": self.username,
"room": self.room,
"messages": len(self.messages),
"last_active": datetime.fromtimestamp(self.last_active).isoformat(),
"created": self.created_at,
}
class SessionManager:
"""Manages all user sessions."""
def __init__(self, max_sessions: int = 20, session_timeout: int = 3600):
self.sessions: dict[str, UserSession] = {}
self.max_sessions = max_sessions
self.session_timeout = session_timeout
self._lock = threading.Lock()
def get_or_create(self, user_id: str, username: str, room: str = "The Threshold") -> UserSession:
"""Get existing session or create new one."""
with self._lock:
self._cleanup_stale()
if user_id not in self.sessions:
if len(self.sessions) >= self.max_sessions:
self._evict_oldest()
self.sessions[user_id] = UserSession(user_id, username, room)
session = self.sessions[user_id]
session.room = room # Update room if moved
session.last_active = time.time()
return session
def _cleanup_stale(self):
"""Remove sessions that timed out."""
now = time.time()
stale = [uid for uid, s in self.sessions.items()
if now - s.last_active > self.session_timeout]
for uid in stale:
del self.sessions[uid]
def _evict_oldest(self):
"""Evict the least recently active session."""
if not self.sessions:
return
oldest = min(self.sessions.items(), key=lambda x: x[1].last_active)
del self.sessions[oldest[0]]
def list_sessions(self) -> list:
"""List all active sessions."""
return [s.get_context_summary() for s in self.sessions.values()]
def get_session_count(self) -> int:
return len(self.sessions)
# ── HTTP API ───────────────────────────────────────────────────────────
session_manager = SessionManager()
presence_manager = PresenceManager()
class BridgeHandler(BaseHTTPRequestHandler):
"""HTTP handler for multi-user bridge."""
def do_GET(self):
if self.path == '/bridge/health':
self._json_response({
"status": "ok",
"active_sessions": session_manager.get_session_count(),
"timestamp": datetime.now().isoformat(),
})
elif self.path == '/bridge/sessions':
self._json_response({
"sessions": session_manager.list_sessions(),
})
elif self.path.startswith('/bridge/world/'):
room = self.path.split('/bridge/world/')[-1]
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
state = json.loads(state_file.read_text())
room_data = state.get('rooms', {}).get(room, {})
self._json_response({"room": room, "data": room_data})
else:
self._json_response({"room": room, "data": {}})
elif self.path.startswith('/bridge/room/') and self.path.endswith('/players'):
# GET /bridge/room/<room>/players
parts = self.path.split('/')
# ['', 'bridge', 'room', '<room>', 'players']
room = parts[3]
players = presence_manager.get_players_in_room(room)
self._json_response({"room": room, "players": players})
elif self.path.startswith('/bridge/room/') and self.path.endswith('/events'):
# GET /bridge/room/<room>/events[?since=<timestamp>]
parts = self.path.split('/')
room = parts[3]
from urllib.parse import urlparse, parse_qs
query = parse_qs(urlparse(self.path).query)
since = query.get('since', [None])[0]
events = presence_manager.get_room_events(room, since)
self._json_response({"room": room, "events": events})
else:
self._json_response({"error": "not found"}, 404)
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(content_length)) if content_length else {}
if self.path == '/bridge/chat':
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
message = body.get('message', '')
room = body.get('room', 'The Threshold')
if not message:
self._json_response({"error": "no message"}, 400)
return
session = session_manager.get_or_create(user_id, username, room)
# Track presence — enter room if not already there
if not presence_manager.get_players_in_room(room) or \
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
presence_manager.enter_room(user_id, username, room)
response = session.chat(message)
self._json_response({
"response": response,
"user": username,
"room": room,
"session_messages": len(session.messages),
})
elif self.path == '/bridge/move':
user_id = body.get('user_id')
new_room = body.get('room')
if user_id in session_manager.sessions:
session = session_manager.sessions[user_id]
old_room = session.room
# Leave old room
leave_event = presence_manager.leave_room(user_id, old_room)
# Enter new room
session.room = new_room
enter_event = presence_manager.enter_room(user_id, session.username, new_room)
self._json_response({
"ok": True,
"room": new_room,
"events": [e for e in [leave_event, enter_event] if e],
})
else:
self._json_response({"error": "no session"}, 404)
elif self.path == '/bridge/say':
# POST /bridge/say — user says something visible to all in room
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
message = body.get('message', '')
room = body.get('room', 'The Threshold')
if not message:
self._json_response({"error": "no message"}, 400)
return
event = presence_manager.say(user_id, username, room, message)
# Get list of players who should see it
players = presence_manager.get_players_in_room(room)
self._json_response({
"ok": True,
"event": event,
"recipients": players,
})
else:
self._json_response({"error": "not found"}, 404)
def _json_response(self, data: dict, code: int = 200):
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def log_message(self, format, *args):
pass # Suppress HTTP logs
# ── Main ───────────────────────────────────────────────────────────────
def main():
print(f"Multi-User AI Bridge starting on {BRIDGE_HOST}:{BRIDGE_PORT}")
print(f"World dir: {WORLD_DIR}")
print(f"Max sessions: {session_manager.max_sessions}")
print()
print("Endpoints:")
print(f" GET /bridge/health — Health check")
print(f" GET /bridge/sessions — List active sessions")
print(f" GET /bridge/room/<room>/players — List players in a room")
print(f" GET /bridge/room/<room>/events — Room events (presence + chat)")
print(f" POST /bridge/chat — Send message to Timmy")
print(f" POST /bridge/say — Say something to room (visible to all)")
print(f" POST /bridge/move — Move user to room (triggers presence)")
print()
server = HTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
server.serve_forever()
if __name__ == '__main__':
main()