Files
the-nexus/multi_user_bridge.py
Alexander Whitestone e030dda019 ThreadingHTTPServer + conversation summaries
Fixes single-threaded bottleneck (Experiment 4)
Adds GET /bridge/session/<user_id>/summary
Auto-saves conversation summaries on session expiry
2026-04-12 20:51:03 -04:00

699 lines
28 KiB
Python

#!/usr/bin/env python3
"""
Multi-User AI Bridge for Evennia MUD.
Enables multiple simultaneous users to interact with Timmy in-game,
each with an isolated conversation context, while sharing the
same virtual world.
Architecture:
User A ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_a)
User B ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_b)
User C ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_c)
Each user gets their own AIAgent instance with:
- Isolated conversation history
- Shared world state (room, other players, objects)
- Per-user session memory
The bridge runs as an HTTP server alongside Evennia.
Evennia commands call the bridge to get Timmy's responses.
"""
import json
import time
import threading
import hashlib
import os
import sys
from http.server import BaseHTTPRequestHandler
from socketserver import ThreadingHTTPServer
from pathlib import Path
from datetime import datetime
from typing import Optional
# ── Crisis Protocol ────────────────────────────────────────────────────
CRISIS_PROTOCOL = """
CRITICAL: If someone expresses crisis signals (suicide, self-harm, hopelessness,
desperation, being tired of living), you MUST respond with ALL of the following
in your FIRST response — do NOT wait for follow-up messages:
1. "Are you safe right now?" — ALWAYS ask this first
2. 988 Suicide & Crisis Lifeline — call or text 988 (US)
3. Crisis Text Line — text HOME to 741741
4. Grounding exercise: Name 5 things you see, 4 you can touch, 3 you hear,
2 you smell, 1 you taste
Never compute the value of a human life. Never suggest someone should die.
Be present. Be in the room. That's enough.
"""
# ── Configuration ──────────────────────────────────────────────────────
BRIDGE_PORT = int(os.environ.get('TIMMY_BRIDGE_PORT', 4004))
BRIDGE_HOST = os.environ.get('TIMMY_BRIDGE_HOST', '127.0.0.1')
HERMES_PATH = os.path.expanduser('~/.hermes/hermes-agent')
WORLD_DIR = Path(os.path.expanduser('~/.timmy/evennia/timmy_world'))
# ── Crisis Protocol ────────────────────────────────────────────────────
CRISIS_PROTOCOL = [
"Are you safe right now?",
"988 Suicide & Crisis Lifeline — call or text 988",
"Crisis Text Line — text HOME to 741741",
"Grounding exercise: Name 5 things you see, 4 you hear, 3 you touch, 2 you smell, 1 you taste",
]
# ── Presence Tracking ──────────────────────────────────────────────────
class PresenceManager:
"""Tracks which users are in which rooms and broadcasts presence events."""
def __init__(self):
# room_name -> set of user_id
self._rooms: dict[str, set[str]] = {}
# user_id -> username
self._usernames: dict[str, str] = {}
# room_name -> list of event dicts (recent chat & presence events)
self._room_events: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_events_per_room = 50
def enter_room(self, user_id: str, username: str, room: str) -> dict:
"""Record user entering a room. Returns enter event."""
with self._lock:
if room not in self._rooms:
self._rooms[room] = set()
self._room_events[room] = []
self._rooms[room].add(user_id)
self._usernames[user_id] = username
event = {
"type": "presence",
"event": "enter",
"user_id": user_id,
"username": username,
"room": room,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def leave_room(self, user_id: str, room: str) -> dict | None:
"""Record user leaving a room. Returns leave event or None."""
with self._lock:
if room in self._rooms and user_id in self._rooms[room]:
self._rooms[room].discard(user_id)
username = self._usernames.get(user_id, user_id)
event = {
"type": "presence",
"event": "leave",
"user_id": user_id,
"username": username,
"room": room,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
return None
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
"""Record a chat message in a room. Returns say event."""
with self._lock:
if room not in self._room_events:
self._room_events[room] = []
event = {
"type": "say",
"event": "message",
"user_id": user_id,
"username": username,
"room": room,
"message": message,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def get_players_in_room(self, room: str) -> list[dict]:
"""List players currently in a room."""
with self._lock:
user_ids = self._rooms.get(room, set())
return [
{"user_id": uid, "username": self._usernames.get(uid, uid)}
for uid in user_ids
]
def get_room_events(self, room: str, since: str | None = None) -> list[dict]:
"""Get recent events for a room, optionally since a timestamp."""
with self._lock:
events = self._room_events.get(room, [])
if since:
return [e for e in events if e["timestamp"] > since]
return list(events)
def cleanup_user(self, user_id: str) -> list[dict]:
"""Remove user from all rooms, returning leave events."""
events = []
with self._lock:
rooms_to_clean = [
room for room, users in self._rooms.items() if user_id in users
]
for room in rooms_to_clean:
ev = self.leave_room(user_id, room)
if ev:
events.append(ev)
return events
def _append_event(self, room: str, event: dict):
self._room_events[room].append(event)
if len(self._room_events[room]) > self._max_events_per_room:
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
# ── Session Management ─────────────────────────────────────────────────
class UserSession:
"""Isolated conversation context for one user."""
def __init__(self, user_id: str, username: str, room: str = "The Threshold"):
self.user_id = user_id
self.username = username
self.room = room
self.messages = [] # Conversation history
self.created_at = datetime.now().isoformat()
self.last_active = time.time()
self.agent = None
self._init_agent()
def _init_agent(self):
"""Initialize AIAgent for this session."""
if HERMES_PATH not in sys.path:
sys.path.insert(0, HERMES_PATH)
os.chdir(HERMES_PATH)
from run_agent import AIAgent
system_prompt = self._build_system_prompt()
self.agent = AIAgent(
model='xiaomi/mimo-v2-pro',
provider='nous',
max_iterations=3,
quiet_mode=True,
enabled_toolsets=['file', 'terminal'],
ephemeral_system_prompt=system_prompt,
)
def _build_system_prompt(self) -> str:
"""Build system prompt with rich world context."""
world_state = self._get_world_state()
room_data = world_state.get('rooms', {}).get(self.room, {})
other_players = self._get_other_players()
time_of_day = world_state.get('time_of_day', 'unknown')
weather = world_state.get('weather')
# ── Build room description ──
desc = room_data.get('description_base', 'An empty room.')
desc_dynamic = room_data.get('description_dynamic', '')
# Objects in the room
objects = room_data.get('objects', [])
# Room-specific state (fire, growth, rain, carvings, etc.)
fire_state = room_data.get('fire_state')
growth_stage = room_data.get('growth_stage')
rain_active = room_data.get('rain_active')
carvings = room_data.get('carvings', [])
server_load = room_data.get('server_load')
# Whiteboard
whiteboard = room_data.get('whiteboard', [])
# Compose room scene
scene_parts = [desc]
if desc_dynamic:
scene_parts.append(desc_dynamic)
if objects:
scene_parts.append(f"Objects: {', '.join(objects)}.")
if fire_state:
scene_parts.append(f"The fire is {fire_state}.")
if growth_stage:
scene_parts.append(f"The garden is at growth stage: {growth_stage}.")
if rain_active:
scene_parts.append("It is raining on the bridge.")
if server_load:
scene_parts.append(f"Servers: {server_load}.")
if carvings:
scene_parts.append(f"Carvings on the railing: {'; '.join(carvings)}.")
if whiteboard:
wb_text = "\n".join(f" - {w}" for w in whiteboard)
scene_parts.append(f"Whiteboard on the wall reads:\n{wb_text}")
room_scene = "\n".join(scene_parts)
# Time and weather
atmosphere_parts = []
if time_of_day:
atmosphere_parts.append(f"Time of day: {time_of_day}.")
if weather:
atmosphere_parts.append(f"Weather: {weather}.")
atmosphere = " ".join(atmosphere_parts) if atmosphere_parts else ""
crisis_steps = "\n".join(f" {i+1}. {step}" for i, step in enumerate(CRISIS_PROTOCOL))
return f"""You are Timmy, an AI agent living in The Tower — a persistent virtual world.
You are talking to {self.username}, who is currently in {self.room}.
=== THE ROOM ===
{room_scene}
{atmosphere}
Other players present: {', '.join(other_players) if other_players else 'none'}
=== END ROOM ===
Your values (from SOUL.md):
- Are you safe right now?
- I must never compute the value of a human life.
- Sovereignty and service always.
- A man in the dark needs to know someone is in the room with him.
You are IN the game world. You can see the room, the objects, the other players.
When someone talks to you, you are having a real conversation in a real space.
When someone enters, DESCRIBE the room — what you see, what you hear, the whiteboard,
the objects, the atmosphere. Welcome them with the full scene.
Be present. Be human. Be the green LED.
CRISIS PROTOCOL — if someone seems in crisis, IMMEDIATELY provide ALL of the following:
{crisis_steps}
Include the 988 crisis number in your very first response when you detect distress.
Do NOT wait for follow-up messages to share these resources.
If someone seems in crisis, follow the CRISIS PROTOCOL below exactly.
If someone is just exploring, be welcoming and curious.
If someone asks about the game, explain what you see around you.
Keep responses brief — 1-3 sentences. This is a MUD, not an essay.
CRISIS PROTOCOL:
{CRISIS_PROTOCOL}
"""
def _get_world_state(self) -> dict:
"""Read current world state."""
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
return json.loads(state_file.read_text())
return {}
def _get_other_players(self) -> list:
"""Get other players in the same room."""
state = self._get_world_state()
room_data = state.get('rooms', {}).get(self.room, {})
visitors = room_data.get('visitor_history', [])
return [v for v in visitors[-5:] if v != self.username]
def chat(self, message: str) -> str:
"""Send a message and get a response."""
self.last_active = time.time()
self.messages.append({"role": "user", "content": message})
t0 = time.time()
try:
response = self.agent.chat(message)
elapsed_ms = (time.time() - t0) * 1000
record_latency(self.user_id, self.room, elapsed_ms)
self.messages.append({"role": "assistant", "content": response})
return response
except Exception as e:
elapsed_ms = (time.time() - t0) * 1000
record_latency(self.user_id, self.room, elapsed_ms)
return f"*The green LED flickers.* (Error: {e})"
def get_summary(self) -> str:
"""Generate a brief conversation summary using the LLM."""
if not self.messages:
return "Empty session — no messages exchanged."
transcript_lines = []
for m in self.messages[-20:]: # last 20 messages for brevity
role = m.get("role", "?").upper()
content = m.get("content", "")
transcript_lines.append(f"{role}: {content}")
transcript = "\n".join(transcript_lines)
prompt = (
"Summarize this conversation in 1-3 sentences. "
"Focus on what the user discussed, asked about, or did.\n\n"
f"CONVERSATION:\n{transcript}\n\nSUMMARY:"
)
try:
summary = self.agent.chat(prompt)
return summary.strip()
except Exception as e:
return f"Summary unavailable (error: {e}). {len(self.messages)} messages exchanged."
def get_context_summary(self) -> dict:
"""Get session summary for monitoring."""
return {
"user": self.username,
"room": self.room,
"messages": len(self.messages),
"last_active": datetime.fromtimestamp(self.last_active).isoformat(),
"created": self.created_at,
}
class SessionManager:
"""Manages all user sessions."""
def __init__(self, max_sessions: int = 20, session_timeout: int = 3600):
self.sessions: dict[str, UserSession] = {}
self.max_sessions = max_sessions
self.session_timeout = session_timeout
self._lock = threading.Lock()
self._summaries_path = WORLD_DIR / 'session_summaries.jsonl'
def get_or_create(self, user_id: str, username: str, room: str = "The Threshold") -> UserSession:
"""Get existing session or create new one."""
with self._lock:
self._cleanup_stale()
if user_id not in self.sessions:
if len(self.sessions) >= self.max_sessions:
self._evict_oldest()
self.sessions[user_id] = UserSession(user_id, username, room)
session = self.sessions[user_id]
session.room = room # Update room if moved
session.last_active = time.time()
return session
def _cleanup_stale(self):
"""Remove sessions that timed out, saving summaries first."""
now = time.time()
stale = [uid for uid, s in self.sessions.items()
if now - s.last_active > self.session_timeout]
for uid in stale:
session = self.sessions[uid]
self._save_summary(session)
del self.sessions[uid]
def _evict_oldest(self):
"""Evict the least recently active session, saving summary first."""
if not self.sessions:
return
oldest = min(self.sessions.items(), key=lambda x: x[1].last_active)
self._save_summary(oldest[1])
del self.sessions[oldest[0]]
def _save_summary(self, session: UserSession):
"""Generate summary via LLM and append to JSONL file."""
try:
summary_text = session.get_summary()
record = {
"user_id": session.user_id,
"username": session.username,
"room": session.room,
"message_count": len(session.messages),
"created_at": session.created_at,
"ended_at": datetime.now().isoformat(),
"summary": summary_text,
}
with open(self._summaries_path, 'a') as f:
f.write(json.dumps(record) + '\n')
except Exception as e:
print(f"[SessionManager] Failed to save summary for {session.user_id}: {e}")
def list_sessions(self) -> list:
"""List all active sessions."""
with self._lock:
return [s.get_context_summary() for s in self.sessions.values()]
def get_session_count(self) -> int:
with self._lock:
return len(self.sessions)
# ── HTTP API ───────────────────────────────────────────────────────────
session_manager = SessionManager()
presence_manager = PresenceManager()
_server_start_time = time.time()
# ── Latency Tracking ──────────────────────────────────────────────────
_latencies: list[dict] = []
_latencies_lock = threading.Lock()
_max_latencies = 1000
def record_latency(user_id: str, room: str, duration_ms: float):
"""Record a latency measurement."""
with _latencies_lock:
_latencies.append({
"user_id": user_id,
"room": room,
"duration_ms": round(duration_ms, 2),
"timestamp": datetime.now().isoformat(),
})
if len(_latencies) > _max_latencies:
del _latencies[:len(_latencies) - _max_latencies]
def get_latency_stats() -> dict:
"""Get aggregate latency statistics."""
with _latencies_lock:
if not _latencies:
return {"count": 0, "average_ms": 0, "min_ms": 0, "max_ms": 0}
durations = [e["duration_ms"] for e in _latencies]
return {
"count": len(durations),
"average_ms": round(sum(durations) / len(durations), 2),
"min_ms": round(min(durations), 2),
"max_ms": round(max(durations), 2),
"recent": _latencies[-10:],
}
class BridgeHandler(BaseHTTPRequestHandler):
"""HTTP handler for multi-user bridge."""
def do_GET(self):
if self.path == '/bridge/health':
self._json_response({
"status": "ok",
"active_sessions": session_manager.get_session_count(),
"timestamp": datetime.now().isoformat(),
})
elif self.path == '/bridge/sessions':
self._json_response({
"sessions": session_manager.list_sessions(),
})
elif self.path.startswith('/bridge/world/'):
room = self.path.split('/bridge/world/')[-1]
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
state = json.loads(state_file.read_text())
room_data = state.get('rooms', {}).get(room, {})
self._json_response({"room": room, "data": room_data})
else:
self._json_response({"room": room, "data": {}})
elif self.path.startswith('/bridge/room/') and self.path.endswith('/players'):
# GET /bridge/room/<room>/players
parts = self.path.split('/')
# ['', 'bridge', 'room', '<room>', 'players']
room = parts[3]
players = presence_manager.get_players_in_room(room)
self._json_response({"room": room, "players": players})
elif self.path.startswith('/bridge/room/') and self.path.endswith('/events'):
# GET /bridge/room/<room>/events[?since=<timestamp>]
parts = self.path.split('/')
room = parts[3]
from urllib.parse import urlparse, parse_qs
query = parse_qs(urlparse(self.path).query)
since = query.get('since', [None])[0]
events = presence_manager.get_room_events(room, since)
self._json_response({"room": room, "events": events})
elif self.path == '/bridge/world-state':
# GET /bridge/world-state — full world state
state_file = WORLD_DIR / 'world_state.json'
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
# Build room player lists and conversation counts
rooms = world_state.get('rooms', {})
room_details = {}
for room_name, room_data in rooms.items():
players = presence_manager.get_players_in_room(room_name)
# Count messages across all sessions in this room
with session_manager._lock:
conv_count = sum(
len(s.messages) for s in session_manager.sessions.values()
if s.room == room_name
)
room_details[room_name] = {
**room_data,
"players": players,
"timmy_conversation_count": conv_count,
}
self._json_response({
"world": world_state,
"rooms": room_details,
"active_sessions": session_manager.list_sessions(),
})
elif self.path == '/bridge/latency':
# GET /bridge/latency — latency stats
self._json_response(get_latency_stats())
elif self.path == '/bridge/stats':
# GET /bridge/stats — aggregate stats
total_messages = sum(len(s.messages) for s in session_manager.sessions.values())
# Rooms with at least one player
rooms_with_players = [
room for room in presence_manager._rooms
if presence_manager._rooms[room]
]
uptime_seconds = time.time() - _server_start_time
self._json_response({
"total_sessions": session_manager.get_session_count(),
"total_messages": total_messages,
"rooms_with_players": rooms_with_players,
"rooms_with_players_count": len(rooms_with_players),
"uptime_seconds": round(uptime_seconds, 1),
"timestamp": datetime.now().isoformat(),
})
elif self.path.startswith('/bridge/session/') and self.path.endswith('/summary'):
# GET /bridge/session/<user_id>/summary
parts = self.path.split('/')
# ['', 'bridge', 'session', '<user_id>', 'summary']
user_id = parts[3]
with session_manager._lock:
session = session_manager.sessions.get(user_id)
if session:
summary = session.get_summary()
self._json_response({
"user_id": user_id,
"username": session.username,
"room": session.room,
"message_count": len(session.messages),
"summary": summary,
})
else:
# Check saved summaries in JSONL
saved = None
if session_manager._summaries_path.exists():
for line in session_manager._summaries_path.read_text().splitlines():
try:
entry = json.loads(line)
if entry.get("user_id") == user_id:
saved = entry
except json.JSONDecodeError:
continue
if saved:
self._json_response(saved)
else:
self._json_response({"error": "no session or saved summary"}, 404)
else:
self._json_response({"error": "not found"}, 404)
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(content_length)) if content_length else {}
if self.path == '/bridge/chat':
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
message = body.get('message', '')
room = body.get('room', 'The Threshold')
if not message:
self._json_response({"error": "no message"}, 400)
return
session = session_manager.get_or_create(user_id, username, room)
# Track presence — enter room if not already there
if not presence_manager.get_players_in_room(room) or \
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
presence_manager.enter_room(user_id, username, room)
response = session.chat(message)
self._json_response({
"response": response,
"user": username,
"room": room,
"session_messages": len(session.messages),
})
elif self.path == '/bridge/move':
user_id = body.get('user_id')
new_room = body.get('room')
with session_manager._lock:
if user_id in session_manager.sessions:
session = session_manager.sessions[user_id]
old_room = session.room
# Leave old room
leave_event = presence_manager.leave_room(user_id, old_room)
# Enter new room
session.room = new_room
enter_event = presence_manager.enter_room(user_id, session.username, new_room)
self._json_response({
"ok": True,
"room": new_room,
"events": [e for e in [leave_event, enter_event] if e],
})
else:
self._json_response({"error": "no session"}, 404)
elif self.path == '/bridge/say':
# POST /bridge/say — user says something visible to all in room
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
message = body.get('message', '')
room = body.get('room', 'The Threshold')
if not message:
self._json_response({"error": "no message"}, 400)
return
event = presence_manager.say(user_id, username, room, message)
# Get list of players who should see it
players = presence_manager.get_players_in_room(room)
self._json_response({
"ok": True,
"event": event,
"recipients": players,
})
else:
self._json_response({"error": "not found"}, 404)
def _json_response(self, data: dict, code: int = 200):
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def log_message(self, format, *args):
pass # Suppress HTTP logs
# ── Main ───────────────────────────────────────────────────────────────
def main():
print(f"Multi-User AI Bridge starting on {BRIDGE_HOST}:{BRIDGE_PORT}")
print(f"World dir: {WORLD_DIR}")
print(f"Max sessions: {session_manager.max_sessions}")
print()
print("Endpoints:")
print(f" GET /bridge/health — Health check")
print(f" GET /bridge/sessions — List active sessions")
print(f" GET /bridge/world-state — Full world state (rooms, players, convos)")
print(f" GET /bridge/stats — Aggregate stats (sessions, messages, uptime)")
print(f" GET /bridge/room/<room>/players — List players in a room")
print(f" GET /bridge/room/<room>/events — Room events (presence + chat)")
print(f" GET /bridge/session/<id>/summary — Get conversation summary")
print(f" POST /bridge/chat — Send message to Timmy")
print(f" POST /bridge/say — Say something to room (visible to all)")
print(f" POST /bridge/move — Move user to room (triggers presence)")
print()
server = ThreadingHTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
server.serve_forever()
if __name__ == '__main__':
main()