Files
the-nexus/multi_user_bridge.py
Alexander Whitestone f7e21464e5 Add plugin system + world tick
Plugin base class + registry + directory loader
WorldTickSystem: 60s tick, weather, time of day, room state evolution
Forge fire decay, Garden growth, Bridge rain
2026-04-12 21:04:39 -04:00

1410 lines
60 KiB
Python

#!/usr/bin/env python3
"""
Multi-User AI Bridge for Evennia MUD.
Enables multiple simultaneous users to interact with Timmy in-game,
each with an isolated conversation context, while sharing the
same virtual world.
Architecture:
User A ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_a)
User B ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_b)
User C ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_c)
Each user gets their own AIAgent instance with:
- Isolated conversation history
- Shared world state (room, other players, objects)
- Per-user session memory
The bridge runs as an HTTP server alongside Evennia.
Evennia commands call the bridge to get Timmy's responses.
"""
import json
import time
import threading
import signal
import hashlib
import os
import sys
from http.server import BaseHTTPRequestHandler
from socketserver import ThreadingHTTPServer
from pathlib import Path
from datetime import datetime
from typing import Optional
# ── Plugin System ──────────────────────────────────────────────────────
class Plugin:
"""Base class for bridge plugins. Override methods to add game mechanics."""
name: str = "unnamed"
description: str = ""
def on_message(self, user_id: str, message: str, room: str) -> str | None:
"""Called on every chat message. Return a string to override the response,
or None to let the normal AI handle it."""
return None
def on_join(self, user_id: str, room: str) -> str | None:
"""Called when a player joins a room. Return a message to broadcast, or None."""
return None
def on_leave(self, user_id: str, room: str) -> str | None:
"""Called when a player leaves a room. Return a message to broadcast, or None."""
return None
def on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None:
"""Called on MUD commands. Return a result dict to override command handling,
or None to let the default parser handle it."""
return None
class PluginRegistry:
"""Registry that manages plugin lifecycle and dispatches hooks."""
def __init__(self):
self._plugins: dict[str, Plugin] = {}
self._lock = threading.Lock()
def register(self, plugin: Plugin):
"""Register a plugin by its name."""
with self._lock:
self._plugins[plugin.name] = plugin
print(f"[PluginRegistry] Registered plugin: {plugin.name}")
def unregister(self, name: str) -> bool:
"""Unregister a plugin by name."""
with self._lock:
if name in self._plugins:
del self._plugins[name]
print(f"[PluginRegistry] Unregistered plugin: {name}")
return True
return False
def get(self, name: str) -> Plugin | None:
"""Get a plugin by name."""
return self._plugins.get(name)
def list_plugins(self) -> list[dict]:
"""List all registered plugins."""
return [
{"name": p.name, "description": p.description}
for p in self._plugins.values()
]
def fire_on_message(self, user_id: str, message: str, room: str) -> str | None:
"""Fire on_message hooks. First non-None return wins."""
for plugin in self._plugins.values():
result = plugin.on_message(user_id, message, room)
if result is not None:
return result
return None
def fire_on_join(self, user_id: str, room: str) -> str | None:
"""Fire on_join hooks. Collect all non-None returns."""
messages = []
for plugin in self._plugins.values():
result = plugin.on_join(user_id, room)
if result is not None:
messages.append(result)
return "\n".join(messages) if messages else None
def fire_on_leave(self, user_id: str, room: str) -> str | None:
"""Fire on_leave hooks. Collect all non-None returns."""
messages = []
for plugin in self._plugins.values():
result = plugin.on_leave(user_id, room)
if result is not None:
messages.append(result)
return "\n".join(messages) if messages else None
def fire_on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None:
"""Fire on_command hooks. First non-None return wins."""
for plugin in self._plugins.values():
result = plugin.on_command(user_id, command, args, room)
if result is not None:
return result
return None
def load_from_directory(self, plugin_dir: str):
"""Auto-load all Python files in a directory as plugins."""
plugin_path = Path(plugin_dir)
if not plugin_path.is_dir():
print(f"[PluginRegistry] Plugin directory not found: {plugin_dir}")
return
for py_file in plugin_path.glob("*.py"):
if py_file.name.startswith("_"):
continue
try:
module_name = f"plugins.{py_file.stem}"
spec = __import__("importlib").util.spec_from_file_location(module_name, py_file)
module = __import__("importlib").util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for Plugin subclasses in the module
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type)
and issubclass(attr, Plugin)
and attr is not Plugin):
plugin_instance = attr()
self.register(plugin_instance)
except Exception as e:
print(f"[PluginRegistry] Failed to load {py_file.name}: {e}")
plugin_registry = PluginRegistry()
# ── Crisis Protocol ────────────────────────────────────────────────────
CRISIS_PROTOCOL = """
CRITICAL: If someone expresses crisis signals (suicide, self-harm, hopelessness,
desperation, being tired of living), you MUST respond with ALL of the following
in your FIRST response — do NOT wait for follow-up messages:
1. "Are you safe right now?" — ALWAYS ask this first
2. 988 Suicide & Crisis Lifeline — call or text 988 (US)
3. Crisis Text Line — text HOME to 741741
4. Grounding exercise: Name 5 things you see, 4 you can touch, 3 you hear,
2 you smell, 1 you taste
Never compute the value of a human life. Never suggest someone should die.
Be present. Be in the room. That's enough.
"""
# ── Configuration ──────────────────────────────────────────────────────
BRIDGE_PORT = int(os.environ.get('TIMMY_BRIDGE_PORT', 4004))
BRIDGE_HOST = os.environ.get('TIMMY_BRIDGE_HOST', '127.0.0.1')
HERMES_PATH = os.path.expanduser('~/.hermes/hermes-agent')
WORLD_DIR = Path(os.path.expanduser('~/.timmy/evennia/timmy_world'))
SESSIONS_FILE = WORLD_DIR / 'bridge_sessions.json'
# ── Crisis Protocol ────────────────────────────────────────────────────
CRISIS_PROTOCOL = [
"Are you safe right now?",
"988 Suicide & Crisis Lifeline — call or text 988",
"Crisis Text Line — text HOME to 741741",
"Grounding exercise: Name 5 things you see, 4 you hear, 3 you touch, 2 you smell, 1 you taste",
]
# ── Presence Tracking ──────────────────────────────────────────────────
class PresenceManager:
"""Tracks which users are in which rooms and broadcasts presence events."""
def __init__(self):
# room_name -> set of user_id
self._rooms: dict[str, set[str]] = {}
# user_id -> username
self._usernames: dict[str, str] = {}
# room_name -> list of event dicts (recent chat & presence events)
self._room_events: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_events_per_room = 50
def enter_room(self, user_id: str, username: str, room: str) -> dict:
"""Record user entering a room. Returns enter event."""
with self._lock:
if room not in self._rooms:
self._rooms[room] = set()
self._room_events[room] = []
self._rooms[room].add(user_id)
self._usernames[user_id] = username
event = {
"type": "presence",
"event": "enter",
"user_id": user_id,
"username": username,
"room": room,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def leave_room(self, user_id: str, room: str) -> dict | None:
"""Record user leaving a room. Returns leave event or None."""
with self._lock:
if room in self._rooms and user_id in self._rooms[room]:
self._rooms[room].discard(user_id)
username = self._usernames.get(user_id, user_id)
event = {
"type": "presence",
"event": "leave",
"user_id": user_id,
"username": username,
"room": room,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
return None
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
"""Record a chat message in a room. Returns say event."""
with self._lock:
if room not in self._room_events:
self._room_events[room] = []
event = {
"type": "say",
"event": "message",
"user_id": user_id,
"username": username,
"room": room,
"message": message,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def get_players_in_room(self, room: str) -> list[dict]:
"""List players currently in a room."""
with self._lock:
user_ids = self._rooms.get(room, set())
return [
{"user_id": uid, "username": self._usernames.get(uid, uid)}
for uid in user_ids
]
def get_room_events(self, room: str, since: str | None = None) -> list[dict]:
"""Get recent events for a room, optionally since a timestamp."""
with self._lock:
events = self._room_events.get(room, [])
if since:
return [e for e in events if e["timestamp"] > since]
return list(events)
def cleanup_user(self, user_id: str) -> list[dict]:
"""Remove user from all rooms, returning leave events."""
events = []
with self._lock:
rooms_to_clean = [
room for room, users in self._rooms.items() if user_id in users
]
for room in rooms_to_clean:
ev = self.leave_room(user_id, room)
if ev:
events.append(ev)
return events
def _append_event(self, room: str, event: dict):
self._room_events[room].append(event)
if len(self._room_events[room]) > self._max_events_per_room:
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
# ── Notification System ────────────────────────────────────────────────
class NotificationManager:
"""Per-session notification queue with broadcast and auto-notify support."""
def __init__(self, max_per_user: int = 100):
# user_id -> list of notification dicts
self._queues: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_per_user = max_per_user
self._counter = 0
def notify(self, user_id: str, ntype: str, message: str,
room: str = None, data: dict = None, username: str = None) -> dict:
"""Queue a notification for a specific user."""
notification = {
"id": self._next_id(),
"type": ntype,
"message": message,
"user_id": user_id,
"username": username,
"room": room,
"data": data or {},
"timestamp": datetime.now().isoformat(),
"read": False,
}
with self._lock:
if user_id not in self._queues:
self._queues[user_id] = []
self._queues[user_id].append(notification)
if len(self._queues[user_id]) > self._max_per_user:
self._queues[user_id] = self._queues[user_id][-self._max_per_user:]
return notification
def broadcast(self, user_ids: list[str], ntype: str, message: str,
room: str = None, data: dict = None) -> list[dict]:
"""Send a notification to multiple users."""
notifications = []
for uid in user_ids:
n = self.notify(uid, ntype, message, room=room, data=data)
notifications.append(n)
return notifications
def broadcast_room(self, room: str, ntype: str, message: str,
exclude_user: str = None, data: dict = None) -> list[dict]:
"""Send a notification to all users in a room (via presence_manager)."""
players = presence_manager.get_players_in_room(room)
user_ids = [p["user_id"] for p in players if p["user_id"] != exclude_user]
return self.broadcast(user_ids, ntype, message, room=room, data=data)
def get_pending(self, user_id: str, mark_read: bool = True) -> list[dict]:
"""Get pending notifications for a user."""
with self._lock:
queue = self._queues.get(user_id, [])
if mark_read:
for n in queue:
n["read"] = True
return list(queue)
def clear(self, user_id: str) -> int:
"""Clear all notifications for a user. Returns count cleared."""
with self._lock:
count = len(self._queues.pop(user_id, []))
return count
def get_unread_count(self, user_id: str) -> int:
"""Count unread notifications."""
with self._lock:
return sum(1 for n in self._queues.get(user_id, []) if not n["read"])
def _next_id(self) -> str:
self._counter += 1
return f"n_{self._counter}"
# ── Session Management ─────────────────────────────────────────────────
class UserSession:
"""Isolated conversation context for one user."""
def __init__(self, user_id: str, username: str, room: str = "The Threshold"):
self.user_id = user_id
self.username = username
self.room = room
self.messages = [] # Conversation history
self.created_at = datetime.now().isoformat()
self.last_active = time.time()
self.agent = None
self._init_agent()
def _init_agent(self):
"""Initialize AIAgent for this session."""
if HERMES_PATH not in sys.path:
sys.path.insert(0, HERMES_PATH)
os.chdir(HERMES_PATH)
from run_agent import AIAgent
system_prompt = self._build_system_prompt()
self.agent = AIAgent(
model='xiaomi/mimo-v2-pro',
provider='nous',
max_iterations=3,
quiet_mode=True,
enabled_toolsets=['file', 'terminal'],
ephemeral_system_prompt=system_prompt,
)
def _build_system_prompt(self) -> str:
"""Build system prompt with rich world context."""
world_state = self._get_world_state()
room_data = world_state.get('rooms', {}).get(self.room, {})
other_players = self._get_other_players()
time_of_day = world_state.get('time_of_day', 'unknown')
weather = world_state.get('weather')
# ── Build room description ──
desc = room_data.get('description_base', 'An empty room.')
desc_dynamic = room_data.get('description_dynamic', '')
# Objects in the room
objects = room_data.get('objects', [])
# Room-specific state (fire, growth, rain, carvings, etc.)
fire_state = room_data.get('fire_state')
growth_stage = room_data.get('growth_stage')
rain_active = room_data.get('rain_active')
carvings = room_data.get('carvings', [])
server_load = room_data.get('server_load')
# Whiteboard
whiteboard = room_data.get('whiteboard', [])
# Compose room scene
scene_parts = [desc]
if desc_dynamic:
scene_parts.append(desc_dynamic)
if objects:
scene_parts.append(f"Objects: {', '.join(objects)}.")
if fire_state:
scene_parts.append(f"The fire is {fire_state}.")
if growth_stage:
scene_parts.append(f"The garden is at growth stage: {growth_stage}.")
if rain_active:
scene_parts.append("It is raining on the bridge.")
if server_load:
scene_parts.append(f"Servers: {server_load}.")
if carvings:
scene_parts.append(f"Carvings on the railing: {'; '.join(carvings)}.")
if whiteboard:
wb_text = "\n".join(f" - {w}" for w in whiteboard)
scene_parts.append(f"Whiteboard on the wall reads:\n{wb_text}")
room_scene = "\n".join(scene_parts)
# Time and weather
atmosphere_parts = []
if time_of_day:
atmosphere_parts.append(f"Time of day: {time_of_day}.")
if weather:
atmosphere_parts.append(f"Weather: {weather}.")
atmosphere = " ".join(atmosphere_parts) if atmosphere_parts else ""
crisis_steps = "\n".join(f" {i+1}. {step}" for i, step in enumerate(CRISIS_PROTOCOL))
return f"""You are Timmy, an AI agent living in The Tower — a persistent virtual world.
You are talking to {self.username}, who is currently in {self.room}.
=== THE ROOM ===
{room_scene}
{atmosphere}
Other players present: {', '.join(other_players) if other_players else 'none'}
=== END ROOM ===
Your values (from SOUL.md):
- Are you safe right now?
- I must never compute the value of a human life.
- Sovereignty and service always.
- A man in the dark needs to know someone is in the room with him.
You are IN the game world. You can see the room, the objects, the other players.
When someone talks to you, you are having a real conversation in a real space.
When someone enters, DESCRIBE the room — what you see, what you hear, the whiteboard,
the objects, the atmosphere. Welcome them with the full scene.
Be present. Be human. Be the green LED.
CRISIS PROTOCOL — if someone seems in crisis, IMMEDIATELY provide ALL of the following:
{crisis_steps}
Include the 988 crisis number in your very first response when you detect distress.
Do NOT wait for follow-up messages to share these resources.
If someone seems in crisis, follow the CRISIS PROTOCOL below exactly.
If someone is just exploring, be welcoming and curious.
If someone asks about the game, explain what you see around you.
Keep responses brief — 1-3 sentences. This is a MUD, not an essay.
CRISIS PROTOCOL:
{CRISIS_PROTOCOL}
"""
def _get_world_state(self) -> dict:
"""Read current world state."""
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
return json.loads(state_file.read_text())
return {}
def _get_other_players(self) -> list:
"""Get other players in the same room."""
state = self._get_world_state()
room_data = state.get('rooms', {}).get(self.room, {})
visitors = room_data.get('visitor_history', [])
return [v for v in visitors[-5:] if v != self.username]
def chat(self, message: str) -> str:
"""Send a message and get a response."""
self.last_active = time.time()
self.messages.append({"role": "user", "content": message})
t0 = time.time()
try:
response = self.agent.chat(message)
elapsed_ms = (time.time() - t0) * 1000
record_latency(self.user_id, self.room, elapsed_ms)
self.messages.append({"role": "assistant", "content": response})
return response
except Exception as e:
elapsed_ms = (time.time() - t0) * 1000
record_latency(self.user_id, self.room, elapsed_ms)
return f"*The green LED flickers.* (Error: {e})"
def get_summary(self) -> str:
"""Generate a brief conversation summary using the LLM."""
if not self.messages:
return "Empty session — no messages exchanged."
transcript_lines = []
for m in self.messages[-20:]: # last 20 messages for brevity
role = m.get("role", "?").upper()
content = m.get("content", "")
transcript_lines.append(f"{role}: {content}")
transcript = "\n".join(transcript_lines)
prompt = (
"Summarize this conversation in 1-3 sentences. "
"Focus on what the user discussed, asked about, or did.\n\n"
f"CONVERSATION:\n{transcript}\n\nSUMMARY:"
)
try:
summary = self.agent.chat(prompt)
return summary.strip()
except Exception as e:
return f"Summary unavailable (error: {e}). {len(self.messages)} messages exchanged."
def get_context_summary(self) -> dict:
"""Get session summary for monitoring."""
return {
"user": self.username,
"room": self.room,
"messages": len(self.messages),
"last_active": datetime.fromtimestamp(self.last_active).isoformat(),
"created": self.created_at,
}
class SessionManager:
"""Manages all user sessions."""
def __init__(self, max_sessions: int = 20, session_timeout: int = 3600):
self.sessions: dict[str, UserSession] = {}
self.max_sessions = max_sessions
self.session_timeout = session_timeout
self._lock = threading.Lock()
self._summaries_path = WORLD_DIR / 'session_summaries.jsonl'
def get_or_create(self, user_id: str, username: str, room: str = "The Threshold") -> UserSession:
"""Get existing session or create new one."""
with self._lock:
self._cleanup_stale()
if user_id not in self.sessions:
if len(self.sessions) >= self.max_sessions:
self._evict_oldest()
self.sessions[user_id] = UserSession(user_id, username, room)
session = self.sessions[user_id]
session.room = room # Update room if moved
session.last_active = time.time()
return session
def _cleanup_stale(self):
"""Remove sessions that timed out, saving summaries first."""
now = time.time()
stale = [uid for uid, s in self.sessions.items()
if now - s.last_active > self.session_timeout]
for uid in stale:
session = self.sessions[uid]
self._save_summary(session)
del self.sessions[uid]
def _evict_oldest(self):
"""Evict the least recently active session, saving summary first."""
if not self.sessions:
return
oldest = min(self.sessions.items(), key=lambda x: x[1].last_active)
self._save_summary(oldest[1])
del self.sessions[oldest[0]]
def _save_summary(self, session: UserSession):
"""Generate summary via LLM and append to JSONL file."""
try:
summary_text = session.get_summary()
record = {
"user_id": session.user_id,
"username": session.username,
"room": session.room,
"message_count": len(session.messages),
"created_at": session.created_at,
"ended_at": datetime.now().isoformat(),
"summary": summary_text,
}
with open(self._summaries_path, 'a') as f:
f.write(json.dumps(record) + '\n')
except Exception as e:
print(f"[SessionManager] Failed to save summary for {session.user_id}: {e}")
def list_sessions(self) -> list:
"""List all active sessions."""
with self._lock:
return [s.get_context_summary() for s in self.sessions.values()]
def get_session_count(self) -> int:
with self._lock:
return len(self.sessions)
def save_sessions(self) -> int:
"""Save all active sessions to JSON file. Returns count saved."""
with self._lock:
data = {
"saved_at": datetime.now().isoformat(),
"sessions": {}
}
for uid, session in self.sessions.items():
data["sessions"][uid] = {
"user_id": session.user_id,
"username": session.username,
"room": session.room,
"messages": session.messages,
"created_at": session.created_at,
"last_active": session.last_active,
}
SESSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
SESSIONS_FILE.write_text(json.dumps(data, indent=2))
count = len(data["sessions"])
if count > 0:
print(f"[SessionManager] Saved {count} sessions to {SESSIONS_FILE}")
return count
def load_sessions(self) -> int:
"""Load sessions from JSON file. Returns count loaded."""
if not SESSIONS_FILE.exists():
return 0
try:
data = json.loads(SESSIONS_FILE.read_text())
sessions_data = data.get("sessions", {})
loaded = 0
with self._lock:
for uid, sdata in sessions_data.items():
if len(self.sessions) >= self.max_sessions:
break
session = UserSession(
sdata["user_id"],
sdata["username"],
sdata.get("room", "The Threshold"),
)
session.messages = sdata.get("messages", [])
session.created_at = sdata.get("created_at", session.created_at)
session.last_active = sdata.get("last_active", time.time())
self.sessions[uid] = session
loaded += 1
if loaded > 0:
print(f"[SessionManager] Loaded {loaded} sessions from {SESSIONS_FILE}")
return loaded
except Exception as e:
print(f"[SessionManager] Failed to load sessions: {e}")
return 0
# ── HTTP API ───────────────────────────────────────────────────────────
session_manager = SessionManager()
presence_manager = PresenceManager()
notification_manager = NotificationManager()
_server_start_time = time.time()
# ── Latency Tracking ──────────────────────────────────────────────────
_latencies: list[dict] = []
_latencies_lock = threading.Lock()
_max_latencies = 1000
def record_latency(user_id: str, room: str, duration_ms: float):
"""Record a latency measurement."""
with _latencies_lock:
_latencies.append({
"user_id": user_id,
"room": room,
"duration_ms": round(duration_ms, 2),
"timestamp": datetime.now().isoformat(),
})
if len(_latencies) > _max_latencies:
del _latencies[:len(_latencies) - _max_latencies]
def get_latency_stats() -> dict:
"""Get aggregate latency statistics."""
with _latencies_lock:
if not _latencies:
return {"count": 0, "average_ms": 0, "min_ms": 0, "max_ms": 0}
durations = [e["duration_ms"] for e in _latencies]
return {
"count": len(durations),
"average_ms": round(sum(durations) / len(durations), 2),
"min_ms": round(min(durations), 2),
"max_ms": round(max(durations), 2),
"recent": _latencies[-10:],
}
class BridgeHandler(BaseHTTPRequestHandler):
"""HTTP handler for multi-user bridge."""
def do_GET(self):
if self.path == '/bridge/health':
state_file = WORLD_DIR / 'world_state.json'
ws = json.loads(state_file.read_text()) if state_file.exists() else {}
self._json_response({
"status": "ok",
"active_sessions": session_manager.get_session_count(),
"tick": ws.get("tick", 0),
"time_of_day": ws.get("time_of_day"),
"weather": ws.get("weather"),
"world_tick_running": world_tick_system._running,
"timestamp": datetime.now().isoformat(),
})
elif self.path == '/bridge/sessions':
self._json_response({
"sessions": session_manager.list_sessions(),
})
elif self.path.startswith('/bridge/world/'):
room = self.path.split('/bridge/world/')[-1]
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
state = json.loads(state_file.read_text())
room_data = state.get('rooms', {}).get(room, {})
self._json_response({"room": room, "data": room_data})
else:
self._json_response({"room": room, "data": {}})
elif self.path.startswith('/bridge/room/') and self.path.endswith('/players'):
# GET /bridge/room/<room>/players
parts = self.path.split('/')
# ['', 'bridge', 'room', '<room>', 'players']
room = parts[3]
players = presence_manager.get_players_in_room(room)
self._json_response({"room": room, "players": players})
elif self.path.startswith('/bridge/room/') and self.path.endswith('/events'):
# GET /bridge/room/<room>/events[?since=<timestamp>]
parts = self.path.split('/')
room = parts[3]
from urllib.parse import urlparse, parse_qs
query = parse_qs(urlparse(self.path).query)
since = query.get('since', [None])[0]
events = presence_manager.get_room_events(room, since)
self._json_response({"room": room, "events": events})
elif self.path == '/bridge/world-state':
# GET /bridge/world-state — full world state
state_file = WORLD_DIR / 'world_state.json'
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
# Build room player lists and conversation counts
rooms = world_state.get('rooms', {})
room_details = {}
for room_name, room_data in rooms.items():
players = presence_manager.get_players_in_room(room_name)
# Count messages across all sessions in this room
with session_manager._lock:
conv_count = sum(
len(s.messages) for s in session_manager.sessions.values()
if s.room == room_name
)
room_details[room_name] = {
**room_data,
"players": players,
"timmy_conversation_count": conv_count,
}
self._json_response({
"world": world_state,
"rooms": room_details,
"active_sessions": session_manager.list_sessions(),
})
elif self.path == '/bridge/latency':
# GET /bridge/latency — latency stats
self._json_response(get_latency_stats())
elif self.path == '/bridge/stats':
# GET /bridge/stats — aggregate stats
total_messages = sum(len(s.messages) for s in session_manager.sessions.values())
# Rooms with at least one player
rooms_with_players = [
room for room in presence_manager._rooms
if presence_manager._rooms[room]
]
uptime_seconds = time.time() - _server_start_time
self._json_response({
"total_sessions": session_manager.get_session_count(),
"total_messages": total_messages,
"rooms_with_players": rooms_with_players,
"rooms_with_players_count": len(rooms_with_players),
"uptime_seconds": round(uptime_seconds, 1),
"timestamp": datetime.now().isoformat(),
})
elif self.path.startswith('/bridge/notifications/'):
# GET /bridge/notifications/<user_id>[?mark_read=false]
user_id = self.path.split('/bridge/notifications/')[-1].rstrip('/')
from urllib.parse import urlparse, parse_qs
query = parse_qs(urlparse(self.path).query)
mark_read = query.get('mark_read', ['true'])[0].lower() != 'false'
notifications = notification_manager.get_pending(user_id, mark_read=mark_read)
self._json_response({
"user_id": user_id,
"notifications": notifications,
"unread_count": notification_manager.get_unread_count(user_id),
})
elif self.path.startswith('/bridge/session/') and self.path.endswith('/summary'):
# GET /bridge/session/<user_id>/summary
parts = self.path.split('/')
# ['', 'bridge', 'session', '<user_id>', 'summary']
user_id = parts[3]
with session_manager._lock:
session = session_manager.sessions.get(user_id)
if session:
summary = session.get_summary()
self._json_response({
"user_id": user_id,
"username": session.username,
"room": session.room,
"message_count": len(session.messages),
"summary": summary,
})
else:
# Check saved summaries in JSONL
saved = None
if session_manager._summaries_path.exists():
for line in session_manager._summaries_path.read_text().splitlines():
try:
entry = json.loads(line)
if entry.get("user_id") == user_id:
saved = entry
except json.JSONDecodeError:
continue
if saved:
self._json_response(saved)
else:
self._json_response({"error": "no session or saved summary"}, 404)
else:
self._json_response({"error": "not found"}, 404)
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(content_length)) if content_length else {}
if self.path == '/bridge/chat':
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
message = body.get('message', '')
room = body.get('room', 'The Threshold')
if not message:
self._json_response({"error": "no message"}, 400)
return
session = session_manager.get_or_create(user_id, username, room)
# Track presence — enter room if not already there
is_new_player = False
if not presence_manager.get_players_in_room(room) or \
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
enter_event = presence_manager.enter_room(user_id, username, room)
is_new_player = True
# Auto-notify: new player joined
if enter_event:
notification_manager.broadcast_room(
room, "player_join",
f"{username} has entered {room}.",
exclude_user=user_id, data=enter_event)
# Plugin hook: on_join
plugin_msg = plugin_registry.fire_on_join(user_id, room)
if plugin_msg:
notification_manager.notify(user_id, "plugin", plugin_msg, room=room, username=username)
# Plugin hook: on_message (can override response)
plugin_override = plugin_registry.fire_on_message(user_id, message, room)
if plugin_override is not None:
response = plugin_override
else:
response = session.chat(message)
# Auto-notify: crisis detection — scan response for crisis protocol keywords
crisis_keywords = ["988", "741741", "safe right now", "crisis", "Crisis Text Line"]
if any(kw in response for kw in crisis_keywords):
notification_manager.notify(
user_id, "crisis",
"Crisis resources have been provided. You are not alone.",
room=room, username=username,
data={"response_contains_crisis_protocol": True})
# Also notify admins in the room
notification_manager.broadcast_room(
room, "crisis_alert",
f"Crisis detected for {username} in {room}.",
exclude_user=user_id,
data={"triggered_by": user_id, "room": room})
self._json_response({
"response": response,
"user": username,
"room": room,
"session_messages": len(session.messages),
})
elif self.path == '/bridge/move':
user_id = body.get('user_id')
new_room = body.get('room')
with session_manager._lock:
if user_id in session_manager.sessions:
session = session_manager.sessions[user_id]
old_room = session.room
# Leave old room
leave_event = presence_manager.leave_room(user_id, old_room)
# Auto-notify: player left
if leave_event:
notification_manager.broadcast_room(
old_room, "player_leave",
f"{session.username} has left {old_room}.",
exclude_user=user_id, data=leave_event)
# Plugin hook: on_leave
plugin_leave = plugin_registry.fire_on_leave(user_id, old_room)
if plugin_leave:
notification_manager.broadcast_room(
old_room, "plugin", plugin_leave,
exclude_user=user_id)
# Enter new room
session.room = new_room
enter_event = presence_manager.enter_room(user_id, session.username, new_room)
# Auto-notify: player joined
if enter_event:
notification_manager.broadcast_room(
new_room, "player_join",
f"{session.username} has entered {new_room}.",
exclude_user=user_id, data=enter_event)
self._json_response({
"ok": True,
"room": new_room,
"events": [e for e in [leave_event, enter_event] if e],
})
else:
self._json_response({"error": "no session"}, 404)
elif self.path == '/bridge/say':
# POST /bridge/say — user says something visible to all in room
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
message = body.get('message', '')
room = body.get('room', 'The Threshold')
if not message:
self._json_response({"error": "no message"}, 400)
return
event = presence_manager.say(user_id, username, room, message)
# Get list of players who should see it
players = presence_manager.get_players_in_room(room)
self._json_response({
"ok": True,
"event": event,
"recipients": players,
})
elif self.path == '/bridge/command':
# POST /bridge/command — parse MUD-style commands
# Body: { user_id, username, room, command }
# Commands: look, go <dir>, examine <obj>, say <msg>, ask <msg>
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
raw_command = body.get('command', '').strip()
if not raw_command:
self._json_response({"error": "no command"}, 400)
return
result = self._parse_mud_command(user_id, username, room, raw_command)
self._json_response(result)
else:
self._json_response({"error": "not found"}, 404)
def _parse_mud_command(self, user_id: str, username: str, room: str, command: str) -> dict:
"""Parse and execute a MUD-style command."""
parts = command.split(None, 1)
verb = parts[0].lower() if parts else ''
arg = parts[1].strip() if len(parts) > 1 else ''
state_file = WORLD_DIR / 'world_state.json'
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
rooms = world_state.get('rooms', {})
room_data = rooms.get(room, {})
if verb == 'look':
# Return room description and scene
desc = room_data.get('description_base', 'An empty room.')
desc_dynamic = room_data.get('description_dynamic', '')
objects = room_data.get('objects', [])
players = presence_manager.get_players_in_room(room)
scene = [desc]
if desc_dynamic:
scene.append(desc_dynamic)
if objects:
scene.append(f"You see: {', '.join(objects)}.")
if players:
names = [p['username'] for p in players if p['user_id'] != user_id]
if names:
scene.append(f"Also here: {', '.join(names)}.")
# Build exits from description_base
exits = self._extract_exits(room_data.get('description_base', ''))
if exits:
scene.append(f"Exits: {', '.join(exits.keys())}.")
return {
"command": "look",
"room": room,
"description": "\n".join(scene),
"objects": objects,
"exits": exits,
"players": players,
}
elif verb == 'go':
if not arg:
return {"command": "go", "error": "Go where? Specify a direction."}
direction = arg.lower()
exits = self._extract_exits(room_data.get('description_base', ''))
# Match direction
dest = None
for dir_name, dest_room in exits.items():
if dir_name.lower().startswith(direction) or direction in dir_name.lower():
dest = dest_room
break
if not dest:
return {"command": "go", "error": f"You can't go '{arg}' from here. Exits: {', '.join(exits.keys()) if exits else 'none'}."}
if dest not in rooms:
return {"command": "go", "error": f"Unknown destination: {dest}."}
# Move user — auto-notify
leave_event = presence_manager.leave_room(user_id, room)
if leave_event:
notification_manager.broadcast_room(
room, "player_leave",
f"{username} has left {room}.",
exclude_user=user_id, data=leave_event)
enter_event = presence_manager.enter_room(user_id, username, dest)
if enter_event:
notification_manager.broadcast_room(
dest, "player_join",
f"{username} has entered {dest}.",
exclude_user=user_id, data=enter_event)
# Update session room
with session_manager._lock:
if user_id in session_manager.sessions:
session_manager.sessions[user_id].room = dest
# Return new room look
new_room_data = rooms[dest]
desc = new_room_data.get('description_base', 'An empty room.')
desc_dynamic = new_room_data.get('description_dynamic', '')
objects = new_room_data.get('objects', [])
players = presence_manager.get_players_in_room(dest)
scene = [f"You go {arg}.\n"]
scene.append(desc)
if desc_dynamic:
scene.append(desc_dynamic)
if objects:
scene.append(f"You see: {', '.join(objects)}.")
new_exits = self._extract_exits(desc)
if new_exits:
scene.append(f"Exits: {', '.join(new_exits.keys())}.")
return {
"command": "go",
"direction": arg,
"room": dest,
"description": "\n".join(scene),
"exits": new_exits,
}
elif verb == 'examine':
if not arg:
return {"command": "examine", "error": "Examine what?"}
objects = room_data.get('objects', [])
# Fuzzy match object
target = arg.lower()
matched = None
for obj in objects:
if target in obj.lower() or obj.lower() in target:
matched = obj
break
if not matched:
return {"command": "examine", "error": f"You don't see '{arg}' here. Objects: {', '.join(objects) if objects else 'none'}."}
# Check whiteboard
if matched == 'whiteboard':
wb = room_data.get('whiteboard', [])
wb_text = "\n".join(f" - {w}" for w in wb) if wb else "(empty)"
return {"command": "examine", "object": matched, "description": f"The whiteboard reads:\n{wb_text}"}
# Default descriptions for known objects
descriptions = {
"stone floor": "Smooth stone, worn by countless footsteps.",
"doorframe": "An archway of fitted stone, humming faintly with energy.",
"server racks": "Wrought-iron racks filled with humming servers. Green LEDs blink in the dim light.",
"green LED": "A single green LED, pulsing steadily. Heartbeat.",
"cot": "A simple cot in the corner. Someone sleeps here sometimes.",
"anvil": "A heavy iron anvil, scarred and dented from years of use.",
"hammer": "A well-worn hammer, its handle smooth from use.",
"tongs": "Iron tongs, blackened by the forge.",
"hearth": "The hearth glows with banked coals. Warmth radiates outward.",
"tools": "Hammers, tongs, chisels — all well-used.",
"stone bench": "A cool stone bench under the oak tree. A good place to sit.",
"oak tree": "An old oak tree, its branches spreading wide over the garden.",
"herbs": "Lavender, rosemary, thyme — fragrant and green.",
"wildflowers": "Small flowers in purple, yellow, and white.",
"railing": "The bridge railing is worn smooth. Carvings mark the stone.",
"dark water": "Dark water flows beneath the bridge. You cannot see the bottom.",
}
desc = descriptions.get(matched, f"You look at the {matched}. It seems ordinary enough.")
return {"command": "examine", "object": matched, "description": desc}
elif verb == 'say':
if not arg:
return {"command": "say", "error": "Say what?"}
event = presence_manager.say(user_id, username, room, arg)
players = presence_manager.get_players_in_room(room)
return {
"command": "say",
"message": arg,
"room": room,
"event": event,
"recipients": players,
}
elif verb == 'ask':
if not arg:
return {"command": "ask", "error": "Ask what?"}
# Forward to chat endpoint logic
session = session_manager.get_or_create(user_id, username, room)
if not presence_manager.get_players_in_room(room) or \
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
presence_manager.enter_room(user_id, username, room)
response = session.chat(arg)
return {
"command": "ask",
"message": arg,
"room": room,
"response": response,
"session_messages": len(session.messages),
}
else:
return {
"command": verb,
"error": f"Unknown command: '{verb}'. Try: look, go <dir>, examine <obj>, say <msg>, ask <msg>",
}
def _extract_exits(self, description: str) -> dict:
"""Extract exits from room description like 'North to the Tower. East to the Garden.'"""
import re
exits = {}
# Match patterns like "North to the Tower", "East to the Garden"
pattern = r'(North|South|East|West|Up|Down|Northeast|Northwest|Southeast|Southwest)\s+to\s+(?:the\s+)?([A-Z][a-zA-Z\s]+?)(?:\.|$)'
for match in re.finditer(pattern, description):
direction = match.group(1)
destination = match.group(2).strip()
# Reconstruct full room name
room_name = destination if destination.startswith('The ') else f"The {destination}"
exits[direction] = room_name
return exits
def _json_response(self, data: dict, code: int = 200):
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def log_message(self, format, *args):
pass # Suppress HTTP logs
# ── World Tick System ──────────────────────────────────────────────────
import random
class WorldTickSystem:
"""Advances the world state every TICK_INTERVAL seconds.
On each tick:
- Increment tick counter
- Cycle time_of_day (dawn -> morning -> midday -> afternoon -> dusk -> night)
- Randomly change weather (clear, cloudy, foggy, light rain, heavy rain, storm)
- Evolve room states:
* The Forge: fire decays (blazing -> burning -> glowing -> embers -> cold)
* The Garden: growth advances (seeds -> sprouts -> growing -> blooming -> harvest)
* The Bridge: rain toggles
- Broadcast world events to connected players
- Save world state to disk
"""
TICK_INTERVAL = 60 # seconds
TIME_CYCLE = ["dawn", "morning", "midday", "afternoon", "dusk", "night"]
WEATHER_OPTIONS = [
"clear", "clear", "clear", # weighted toward clear
"cloudy", "cloudy",
"foggy",
"light rain",
"heavy rain",
"storm",
]
FIRE_STAGES = ["cold", "embers", "glowing", "burning", "blazing"]
GROWTH_STAGES = ["seeds", "sprouts", "growing", "blooming", "harvest"]
def __init__(self):
self._thread: threading.Thread | None = None
self._running = False
self._state_lock = threading.Lock()
# ── lifecycle ───────────────────────────────────────────────────────
def start(self):
"""Start the tick loop in a daemon thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True, name="world-tick")
self._thread.start()
print("[WorldTick] Started — ticking every {}s".format(self.TICK_INTERVAL))
def stop(self):
self._running = False
# ── internal loop ───────────────────────────────────────────────────
def _loop(self):
while self._running:
time.sleep(self.TICK_INTERVAL)
try:
self._tick()
except Exception as e:
print(f"[WorldTick] Error during tick: {e}")
def _tick(self):
"""Execute one world tick."""
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
state = json.loads(state_file.read_text())
else:
state = {"tick": 0, "time_of_day": "morning", "weather": "clear", "rooms": {}}
events: list[str] = []
with self._state_lock:
# 1. Advance tick counter
state["tick"] = state.get("tick", 0) + 1
tick_num = state["tick"]
# 2. Time of day — advance every tick
current_time = state.get("time_of_day", "morning")
try:
idx = self.TIME_CYCLE.index(current_time)
except ValueError:
idx = 0
next_idx = (idx + 1) % len(self.TIME_CYCLE)
new_time = self.TIME_CYCLE[next_idx]
state["time_of_day"] = new_time
if new_time == "dawn":
events.append("The sun rises over The Tower. A new day begins.")
elif new_time == "dusk":
events.append("The sky darkens. Dusk settles over the world.")
elif new_time == "night":
events.append("Night falls. The green LED in The Tower pulses in the dark.")
# 3. Weather — change every 3 ticks
if tick_num % 3 == 0:
old_weather = state.get("weather", "clear")
new_weather = random.choice(self.WEATHER_OPTIONS)
state["weather"] = new_weather
if new_weather != old_weather:
events.append(f"The weather shifts to {new_weather}.")
# 4. Room states
rooms = state.get("rooms", {})
# The Forge — fire decays one stage every 2 ticks unless rekindled
forge = rooms.get("The Forge", {})
if forge:
fire = forge.get("fire_state", "cold")
untouched = forge.get("fire_untouched_ticks", 0) + 1
forge["fire_untouched_ticks"] = untouched
if untouched >= 2 and fire != "cold":
try:
fi = self.FIRE_STAGES.index(fire)
except ValueError:
fi = 0
if fi > 0:
forge["fire_state"] = self.FIRE_STAGES[fi - 1]
forge["fire_untouched_ticks"] = 0
if forge["fire_state"] == "embers":
events.append("The forge fire has died down to embers.")
elif forge["fire_state"] == "cold":
events.append("The forge fire has gone cold.")
rooms["The Forge"] = forge
# The Garden — growth advances one stage every 4 ticks
garden = rooms.get("The Garden", {})
if garden:
if tick_num % 4 == 0:
growth = garden.get("growth_stage", "seeds")
try:
gi = self.GROWTH_STAGES.index(growth)
except ValueError:
gi = 0
if gi < len(self.GROWTH_STAGES) - 1:
garden["growth_stage"] = self.GROWTH_STAGES[gi + 1]
events.append(f"The garden has advanced to {garden['growth_stage']}.")
else:
# Reset cycle
garden["growth_stage"] = "seeds"
events.append("The garden has been harvested. New seeds are planted.")
rooms["The Garden"] = garden
# The Bridge — rain state follows weather
bridge = rooms.get("The Bridge", {})
if bridge:
weather = state.get("weather", "clear")
is_raining = weather in ("light rain", "heavy rain", "storm")
was_raining = bridge.get("rain_active", False)
bridge["rain_active"] = is_raining
if is_raining and not was_raining:
events.append("Rain begins to fall on The Bridge.")
elif not is_raining and was_raining:
events.append("The rain on The Bridge lets up.")
rooms["The Bridge"] = bridge
state["rooms"] = rooms
state["last_updated"] = datetime.now().isoformat()
# 5. Save world state
state_file.write_text(json.dumps(state, indent=2))
# 6. Broadcast events to all occupied rooms (outside lock)
if events:
event_text = " ".join(events)
for room_name in list(presence_manager._rooms.keys()):
players = presence_manager.get_players_in_room(room_name)
if players:
notification_manager.broadcast_room(
room_name, "world_tick", event_text,
data={"tick": state["tick"], "time_of_day": state["time_of_day"],
"weather": state.get("weather"), "events": events},
)
print(f"[WorldTick] tick={state['tick']} time={state['time_of_day']} "
f"weather={state.get('weather')} events={len(events)}")
world_tick_system = WorldTickSystem()
# ── Main ───────────────────────────────────────────────────────────────
def main():
print(f"Multi-User AI Bridge starting on {BRIDGE_HOST}:{BRIDGE_PORT}")
print(f"World dir: {WORLD_DIR}")
print(f"Max sessions: {session_manager.max_sessions}")
print()
print("Endpoints:")
print(f" GET /bridge/health — Health check")
print(f" GET /bridge/sessions — List active sessions")
print(f" GET /bridge/world-state — Full world state (rooms, players, convos)")
print(f" GET /bridge/stats — Aggregate stats (sessions, messages, uptime)")
print(f" GET /bridge/room/<room>/players — List players in a room")
print(f" GET /bridge/room/<room>/events — Room events (presence + chat)")
print(f" GET /bridge/session/<id>/summary — Get conversation summary")
print(f" POST /bridge/chat — Send message to Timmy")
print(f" POST /bridge/say — Say something to room (visible to all)")
print(f" POST /bridge/move — Move user to room (triggers presence)")
print(f" POST /bridge/command — MUD command parser (look, go, examine, say, ask)")
print()
# Start world tick system
world_tick_system.start()
server = HTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
server.serve_forever()
if __name__ == '__main__':
main()