Files
the-nexus/multi_user_bridge.py
Alexander Whitestone dfbd96f792
Some checks failed
Deploy Nexus / deploy (push) Failing after 5s
Staging Verification Gate / verify-staging (push) Failing after 7s
Merge pull request 'fix: ChatLog.log() crash — CHATLOG_FILE defined after use (#1349)' (#1390) from burn/1349-1776125702 into main
2026-04-14 00:38:01 +00:00

2889 lines
121 KiB
Python

#!/usr/bin/env python3
"""
Multi-User AI Bridge for Evennia MUD.
Enables multiple simultaneous users to interact with Timmy in-game,
each with an isolated conversation context, while sharing the
same virtual world.
Architecture:
User A ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_a)
User B ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_b)
User C ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_c)
Each user gets their own AIAgent instance with:
- Isolated conversation history
- Shared world state (room, other players, objects)
- Per-user session memory
The bridge runs as an HTTP server alongside Evennia.
Evennia commands call the bridge to get Timmy's responses.
"""
import json
import time
import threading
import signal
import hashlib
import os
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from pathlib import Path
from datetime import datetime
from typing import Optional
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
"""Thread-per-request HTTP server."""
daemon_threads = True
# ── Plugin System ──────────────────────────────────────────────────────
class Plugin:
"""Base class for bridge plugins. Override methods to add game mechanics."""
name: str = "unnamed"
description: str = ""
def on_message(self, user_id: str, message: str, room: str) -> str | None:
"""Called on every chat message. Return a string to override the response,
or None to let the normal AI handle it."""
return None
def on_join(self, user_id: str, room: str) -> str | None:
"""Called when a player joins a room. Return a message to broadcast, or None."""
return None
def on_leave(self, user_id: str, room: str) -> str | None:
"""Called when a player leaves a room. Return a message to broadcast, or None."""
return None
def on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None:
"""Called on MUD commands. Return a result dict to override command handling,
or None to let the default parser handle it."""
return None
class PluginRegistry:
"""Registry that manages plugin lifecycle and dispatches hooks."""
def __init__(self):
self._plugins: dict[str, Plugin] = {}
self._lock = threading.Lock()
def register(self, plugin: Plugin):
"""Register a plugin by its name."""
with self._lock:
self._plugins[plugin.name] = plugin
print(f"[PluginRegistry] Registered plugin: {plugin.name}")
def unregister(self, name: str) -> bool:
"""Unregister a plugin by name."""
with self._lock:
if name in self._plugins:
del self._plugins[name]
print(f"[PluginRegistry] Unregistered plugin: {name}")
return True
return False
def get(self, name: str) -> Plugin | None:
"""Get a plugin by name."""
return self._plugins.get(name)
def list_plugins(self) -> list[dict]:
"""List all registered plugins."""
return [
{"name": p.name, "description": p.description}
for p in self._plugins.values()
]
def fire_on_message(self, user_id: str, message: str, room: str) -> str | None:
"""Fire on_message hooks. First non-None return wins."""
for plugin in self._plugins.values():
result = plugin.on_message(user_id, message, room)
if result is not None:
return result
return None
def fire_on_join(self, user_id: str, room: str) -> str | None:
"""Fire on_join hooks. Collect all non-None returns."""
messages = []
for plugin in self._plugins.values():
result = plugin.on_join(user_id, room)
if result is not None:
messages.append(result)
return "\n".join(messages) if messages else None
def fire_on_leave(self, user_id: str, room: str) -> str | None:
"""Fire on_leave hooks. Collect all non-None returns."""
messages = []
for plugin in self._plugins.values():
result = plugin.on_leave(user_id, room)
if result is not None:
messages.append(result)
return "\n".join(messages) if messages else None
def fire_on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None:
"""Fire on_command hooks. First non-None return wins."""
for plugin in self._plugins.values():
result = plugin.on_command(user_id, command, args, room)
if result is not None:
return result
return None
def load_from_directory(self, plugin_dir: str):
"""Auto-load all Python files in a directory as plugins."""
plugin_path = Path(plugin_dir)
if not plugin_path.is_dir():
print(f"[PluginRegistry] Plugin directory not found: {plugin_dir}")
return
for py_file in plugin_path.glob("*.py"):
if py_file.name.startswith("_"):
continue
try:
module_name = f"plugins.{py_file.stem}"
spec = __import__("importlib").util.spec_from_file_location(module_name, py_file)
module = __import__("importlib").util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for Plugin subclasses in the module
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type)
and issubclass(attr, Plugin)
and attr is not Plugin):
plugin_instance = attr()
self.register(plugin_instance)
except Exception as e:
print(f"[PluginRegistry] Failed to load {py_file.name}: {e}")
plugin_registry = PluginRegistry()
# ── Configuration ──────────────────────────────────────────────────────
BRIDGE_PORT = int(os.environ.get('TIMMY_BRIDGE_PORT', 4004))
BRIDGE_HOST = os.environ.get('TIMMY_BRIDGE_HOST', '127.0.0.1')
HERMES_PATH = os.path.expanduser('~/.hermes/hermes-agent')
WORLD_DIR = Path(os.path.expanduser('~/.timmy/evennia/timmy_world'))
SESSIONS_FILE = WORLD_DIR / 'bridge_sessions.json'
CHATLOG_FILE = WORLD_DIR / 'chat_history.jsonl'
# ── Chat History Log ──────────────────────────────────────────────────
class ChatLog:
"""Per-room rolling buffer of chat messages (say, ask, system).
Also persists to JSONL for long-term history."""
def __init__(self, max_per_room: int = 50):
# room -> list of {type, user_id, username, message, timestamp}
self._history: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_per_room = max_per_room
def log(self, room: str, msg_type: str, message: str,
user_id: str = None, username: str = None, data: dict = None) -> dict:
"""Log a chat message. Types: 'say', 'ask', 'system'.
Returns the entry dict."""
entry = {
"type": msg_type,
"user_id": user_id,
"username": username,
"message": message,
"room": room,
"timestamp": datetime.now().isoformat(),
"data": data or {},
}
with self._lock:
if room not in self._history:
self._history[room] = []
self._history[room].append(entry)
if len(self._history[room]) > self._max_per_room:
self._history[room] = self._history[room][-self._max_per_room:]
# Persist to JSONL
try:
CHATLOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CHATLOG_FILE, 'a') as f:
f.write(json.dumps(entry) + '\n')
except Exception as e:
print(f"[ChatLog] Persist failed: {e}")
return entry
def get_history(self, room: str, limit: int = 50, since: str = None) -> list[dict]:
"""Get recent chat history for a room.
Args:
room: Room name.
limit: Max messages to return (default 50, rolling buffer size).
since: ISO timestamp — only return messages after this time.
Returns list of message dicts, oldest first."""
with self._lock:
entries = list(self._history.get(room, []))
if since:
entries = [e for e in entries if e["timestamp"] > since]
if limit and limit > 0:
entries = entries[-limit:]
return entries
def get_all_rooms(self) -> list[str]:
"""List rooms that have chat history."""
with self._lock:
return list(self._history.keys())
chat_log = ChatLog()
# ── Crisis Protocol ────────────────────────────────────────────────────
CRISIS_PROTOCOL = """
CRITICAL: If someone expresses crisis signals (suicide, self-harm, hopelessness,
desperation, being tired of living), you MUST respond with ALL of the following
in your FIRST response — do NOT wait for follow-up messages:
1. "Are you safe right now?" — ALWAYS ask this first
2. 988 Suicide & Crisis Lifeline — call or text 988 (US)
3. Crisis Text Line — text HOME to 741741
4. Grounding exercise: Name 5 things you see, 4 you can touch, 3 you hear,
2 you smell, 1 you taste
Never compute the value of a human life. Never suggest someone should die.
Be present. Be in the room. That's enough.
"""
# ── Crisis Protocol ────────────────────────────────────────────────────
CRISIS_PROTOCOL = [
"Are you safe right now?",
"988 Suicide & Crisis Lifeline — call or text 988",
"Crisis Text Line — text HOME to 741741",
"Grounding exercise: Name 5 things you see, 4 you hear, 3 you touch, 2 you smell, 1 you taste",
]
# ── Presence Tracking ──────────────────────────────────────────────────
class PresenceManager:
"""Tracks which users are in which rooms and broadcasts presence events."""
def __init__(self):
# room_name -> set of user_id
self._rooms: dict[str, set[str]] = {}
# user_id -> username
self._usernames: dict[str, str] = {}
# room_name -> list of event dicts (recent chat & presence events)
self._room_events: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_events_per_room = 50
def enter_room(self, user_id: str, username: str, room: str) -> dict:
"""Record user entering a room. Returns enter event."""
with self._lock:
if room not in self._rooms:
self._rooms[room] = set()
self._room_events[room] = []
self._rooms[room].add(user_id)
self._usernames[user_id] = username
event = {
"type": "presence",
"event": "enter",
"user_id": user_id,
"username": username,
"room": room,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def leave_room(self, user_id: str, room: str) -> dict | None:
"""Record user leaving a room. Returns leave event or None."""
with self._lock:
if room in self._rooms and user_id in self._rooms[room]:
self._rooms[room].discard(user_id)
username = self._usernames.get(user_id, user_id)
event = {
"type": "presence",
"event": "leave",
"user_id": user_id,
"username": username,
"room": room,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
return None
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
"""Record a chat message in a room. Returns say event."""
with self._lock:
if room not in self._room_events:
self._room_events[room] = []
event = {
"type": "say",
"event": "message",
"user_id": user_id,
"username": username,
"room": room,
"message": message,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def get_players_in_room(self, room: str) -> list[dict]:
"""List players currently in a room."""
with self._lock:
user_ids = self._rooms.get(room, set())
return [
{"user_id": uid, "username": self._usernames.get(uid, uid)}
for uid in user_ids
]
def get_room_events(self, room: str, since: str | None = None) -> list[dict]:
"""Get recent events for a room, optionally since a timestamp."""
with self._lock:
events = self._room_events.get(room, [])
if since:
return [e for e in events if e["timestamp"] > since]
return list(events)
def cleanup_user(self, user_id: str) -> list[dict]:
"""Remove user from all rooms, returning leave events."""
events = []
with self._lock:
rooms_to_clean = [
room for room, users in self._rooms.items() if user_id in users
]
for room in rooms_to_clean:
ev = self.leave_room(user_id, room)
if ev:
events.append(ev)
return events
def _append_event(self, room: str, event: dict):
self._room_events[room].append(event)
if len(self._room_events[room]) > self._max_events_per_room:
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
# ── Notification System ────────────────────────────────────────────────
class NotificationManager:
"""Per-session notification queue with broadcast and auto-notify support."""
def __init__(self, max_per_user: int = 100):
# user_id -> list of notification dicts
self._queues: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_per_user = max_per_user
self._counter = 0
def notify(self, user_id: str, ntype: str, message: str,
room: str = None, data: dict = None, username: str = None) -> dict:
"""Queue a notification for a specific user."""
notification = {
"id": self._next_id(),
"type": ntype,
"message": message,
"user_id": user_id,
"username": username,
"room": room,
"data": data or {},
"timestamp": datetime.now().isoformat(),
"read": False,
}
with self._lock:
if user_id not in self._queues:
self._queues[user_id] = []
self._queues[user_id].append(notification)
if len(self._queues[user_id]) > self._max_per_user:
self._queues[user_id] = self._queues[user_id][-self._max_per_user:]
return notification
def broadcast(self, user_ids: list[str], ntype: str, message: str,
room: str = None, data: dict = None) -> list[dict]:
"""Send a notification to multiple users."""
notifications = []
for uid in user_ids:
n = self.notify(uid, ntype, message, room=room, data=data)
notifications.append(n)
return notifications
def broadcast_room(self, room: str, ntype: str, message: str,
exclude_user: str = None, data: dict = None) -> list[dict]:
"""Send a notification to all users in a room (via presence_manager)."""
players = presence_manager.get_players_in_room(room)
user_ids = [p["user_id"] for p in players if p["user_id"] != exclude_user]
return self.broadcast(user_ids, ntype, message, room=room, data=data)
def get_pending(self, user_id: str, mark_read: bool = True) -> list[dict]:
"""Get pending notifications for a user."""
with self._lock:
queue = self._queues.get(user_id, [])
if mark_read:
for n in queue:
n["read"] = True
return list(queue)
def clear(self, user_id: str) -> int:
"""Clear all notifications for a user. Returns count cleared."""
with self._lock:
count = len(self._queues.pop(user_id, []))
return count
def get_unread_count(self, user_id: str) -> int:
"""Count unread notifications."""
with self._lock:
return sum(1 for n in self._queues.get(user_id, []) if not n["read"])
def _next_id(self) -> str:
self._counter += 1
return f"n_{self._counter}"
# ── Quest System ───────────────────────────────────────────────────────
QUESTS_FILE = WORLD_DIR / 'quests.json'
class Quest:
"""A quest with name, description, objectives, rewards, and status."""
def __init__(self, quest_id: str, name: str, description: str,
objectives: list[str], rewards: list[str],
assigned_to: str | None = None):
self.quest_id = quest_id
self.name = name
self.description = description
self.objectives = objectives # list of objective descriptions
self.rewards = rewards
self.status = "available" # available, active, completed, failed
self.assigned_to = assigned_to # user_id or None
self.completed_objectives: list[int] = [] # indices of completed objectives
self.created_at = datetime.now().isoformat()
self.completed_at: str | None = None
def to_dict(self) -> dict:
return {
"quest_id": self.quest_id,
"name": self.name,
"description": self.description,
"objectives": self.objectives,
"rewards": self.rewards,
"status": self.status,
"assigned_to": self.assigned_to,
"completed_objectives": self.completed_objectives,
"created_at": self.created_at,
"completed_at": self.completed_at,
}
@classmethod
def from_dict(cls, data: dict) -> 'Quest':
q = cls(
data["quest_id"], data["name"], data["description"],
data["objectives"], data["rewards"], data.get("assigned_to"),
)
q.status = data.get("status", "available")
q.completed_objectives = data.get("completed_objectives", [])
q.created_at = data.get("created_at", q.created_at)
q.completed_at = data.get("completed_at")
return q
@property
def is_complete(self) -> bool:
return len(self.completed_objectives) >= len(self.objectives)
class QuestManager:
"""Manages quest lifecycle: create, assign, complete, list."""
def __init__(self):
self._quests: dict[str, Quest] = {}
self._counter = 0
self._lock = threading.Lock()
self.load()
def create(self, name: str, description: str,
objectives: list[str], rewards: list[str]) -> Quest:
"""Create a new quest template."""
with self._lock:
self._counter += 1
quest_id = f"q_{self._counter}_{int(time.time())}"
quest = Quest(quest_id, name, description, objectives, rewards)
self._quests[quest_id] = quest
self._save()
return quest
def assign(self, quest_id: str, user_id: str) -> Quest | None:
"""Assign a quest to a user. Creates a copy if already assigned to someone else."""
with self._lock:
quest = self._quests.get(quest_id)
if not quest:
return None
# If already assigned to this user, return as-is
if quest.assigned_to == user_id:
return quest
# If available, assign directly
if quest.status == "available":
quest.assigned_to = user_id
quest.status = "active"
self._save()
return quest
# If assigned to someone else, create a copy for this user
if quest.assigned_to and quest.assigned_to != user_id:
self._counter += 1
new_id = f"q_{self._counter}_{int(time.time())}"
new_quest = Quest(new_id, quest.name, quest.description,
list(quest.objectives), list(quest.rewards),
assigned_to=user_id)
new_quest.status = "active"
self._quests[new_id] = new_quest
self._save()
return new_quest
return None
def complete_objective(self, quest_id: str, user_id: str,
objective_index: int) -> dict:
"""Mark an objective as complete. Returns quest state or error."""
with self._lock:
quest = self._quests.get(quest_id)
if not quest:
return {"error": "Quest not found."}
if quest.assigned_to != user_id:
return {"error": "Quest not assigned to you."}
if quest.status != "active":
return {"error": f"Quest status is '{quest.status}', not active."}
if objective_index < 0 or objective_index >= len(quest.objectives):
return {"error": f"Invalid objective index. Valid: 0-{len(quest.objectives)-1}."}
if objective_index in quest.completed_objectives:
return {"error": "Objective already completed."}
quest.completed_objectives.append(objective_index)
if quest.is_complete:
quest.status = "completed"
quest.completed_at = datetime.now().isoformat()
self._save()
return {"ok": True, "quest": quest.to_dict()}
def get_user_quests(self, user_id: str) -> list[dict]:
"""Get all active quests for a user."""
with self._lock:
return [
q.to_dict() for q in self._quests.values()
if q.assigned_to == user_id and q.status == "active"
]
def get_all_quests(self) -> list[dict]:
"""List all quests regardless of status."""
with self._lock:
return [q.to_dict() for q in self._quests.values()]
def get_quest(self, quest_id: str) -> Quest | None:
with self._lock:
return self._quests.get(quest_id)
def get_available_quests(self) -> list[dict]:
"""List quests not yet assigned to anyone."""
with self._lock:
return [
q.to_dict() for q in self._quests.values()
if q.status == "available"
]
def _save(self):
"""Persist quests to disk."""
try:
data = {
"counter": self._counter,
"quests": {qid: q.to_dict() for qid, q in self._quests.items()},
}
QUESTS_FILE.parent.mkdir(parents=True, exist_ok=True)
QUESTS_FILE.write_text(json.dumps(data, indent=2))
except Exception as e:
print(f"[QuestManager] Save failed: {e}")
def load(self):
"""Load quests from disk."""
if not QUESTS_FILE.exists():
self._seed_default_quests()
return
try:
data = json.loads(QUESTS_FILE.read_text())
self._counter = data.get("counter", 0)
for qid, qdata in data.get("quests", {}).items():
self._quests[qid] = Quest.from_dict(qdata)
print(f"[QuestManager] Loaded {len(self._quests)} quests.")
except Exception as e:
print(f"[QuestManager] Load failed: {e}")
self._seed_default_quests()
def _seed_default_quests(self):
"""Seed starter quests if no quests file exists."""
self.create(
"The Green LED's Light",
"Help keep the green LED in The Tower glowing by visiting it and tending the space.",
["Visit The Tower room", "Examine the green LED", "Say something kind to Timmy"],
["Timmy's gratitude", "A warm glow in your heart"],
)
self.create(
"Garden Guardian",
"The Garden needs tending. Visit it and help the herbs grow.",
["Visit The Garden room", "Examine the herbs", "Examine the wildflowers"],
["Fresh herbs for the kitchen", "Garden knowledge"],
)
self.create(
"Forge Keeper",
"The Forge fire needs watching. Visit and tend the hearth.",
["Visit The Forge room", "Examine the hearth", "Examine the anvil"],
["Warmth and light", "Blacksmith lore"],
)
print(f"[QuestManager] Seeded {len(self._quests)} default quests.")
quest_manager = QuestManager()
# ── Inventory System ──────────────────────────────────────────────────
INVENTORY_FILE = WORLD_DIR / 'inventory.json'
class InventoryManager:
"""Per-user inventory with room-based drop/pickup. Items are visible to all in a room."""
def __init__(self):
# user_id -> list of {name, description, acquired_at}
self._inventories: dict[str, list[dict]] = {}
# room -> list of {name, description, dropped_by, dropped_at}
self._room_items: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self.load()
def take_item(self, user_id: str, username: str, room: str, item_name: str) -> dict:
"""Pick up an item from a room into user inventory."""
with self._lock:
room_items = self._room_items.get(room, [])
match_idx = None
for i, item in enumerate(room_items):
if item_name.lower() in item["name"].lower() or item["name"].lower() in item_name.lower():
match_idx = i
break
if match_idx is None:
return {"error": f"There is no '{item_name}' in {room}."}
item = room_items.pop(match_idx)
if user_id not in self._inventories:
self._inventories[user_id] = []
acquired_item = {
"name": item["name"],
"description": item["description"],
"acquired_at": datetime.now().isoformat(),
"from_room": room,
}
self._inventories[user_id].append(acquired_item)
self._save()
return {"ok": True, "item": acquired_item, "room": room}
def drop_item(self, user_id: str, username: str, room: str, item_name: str) -> dict:
"""Drop an item from user inventory into a room."""
with self._lock:
inv = self._inventories.get(user_id, [])
match_idx = None
for i, item in enumerate(inv):
if item_name.lower() in item["name"].lower() or item["name"].lower() in item_name.lower():
match_idx = i
break
if match_idx is None:
return {"error": f"You don't have '{item_name}' in your inventory."}
item = inv.pop(match_idx)
if room not in self._room_items:
self._room_items[room] = []
dropped_item = {
"name": item["name"],
"description": item["description"],
"dropped_by": username,
"dropped_at": datetime.now().isoformat(),
}
self._room_items[room].append(dropped_item)
self._save()
return {"ok": True, "item": dropped_item, "room": room}
def get_inventory(self, user_id: str) -> list[dict]:
"""Get a user's inventory."""
with self._lock:
return list(self._inventories.get(user_id, []))
def get_room_items(self, room: str) -> list[dict]:
"""Get items on the ground in a room."""
with self._lock:
return list(self._room_items.get(room, []))
def add_room_item(self, room: str, name: str, description: str, dropped_by: str = "system"):
"""Place an item in a room (e.g. seeded items)."""
with self._lock:
if room not in self._room_items:
self._room_items[room] = []
self._room_items[room].append({
"name": name,
"description": description,
"dropped_by": dropped_by,
"dropped_at": datetime.now().isoformat(),
})
self._save()
def _save(self):
"""Persist inventory and room items to disk."""
try:
data = {
"inventories": self._inventories,
"room_items": self._room_items,
}
INVENTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
INVENTORY_FILE.write_text(json.dumps(data, indent=2))
except Exception as e:
print(f"[InventoryManager] Save failed: {e}")
def load(self):
"""Load inventory from disk."""
if not INVENTORY_FILE.exists():
self._seed_default_items()
return
try:
data = json.loads(INVENTORY_FILE.read_text())
self._inventories = data.get("inventories", {})
self._room_items = data.get("room_items", {})
total_inv = sum(len(v) for v in self._inventories.values())
total_room = sum(len(v) for v in self._room_items.values())
print(f"[InventoryManager] Loaded {total_inv} inventory items, {total_room} room items.")
except Exception as e:
print(f"[InventoryManager] Load failed: {e}")
self._seed_default_items()
def _seed_default_items(self):
"""Seed starter items in rooms."""
starters = {
"The Tower": [
("Rusty Key", "A small iron key, green with age. It might open something."),
("Old Lantern", "A dented brass lantern. The glass is cracked but intact."),
],
"The Forge": [
("Iron Ring", "A plain iron ring, warm to the touch."),
("Ember Shard", "A glowing fragment of coal that never quite goes out."),
],
"The Garden": [
("Sprig of Rosemary", "A fragrant sprig, still fresh."),
("Smooth Stone", "A flat river stone, cool and grey."),
],
"The Bridge": [
("Waterlogged Coin", "A coin so corroded the markings are unreadable."),
("Driftwood Charm", "A small carving of twisted driftwood."),
],
}
for room, items in starters.items():
for name, desc in items:
self.add_room_item(room, name, desc, dropped_by="world")
print(f"[InventoryManager] Seeded default items in {len(starters)} rooms.")
inventory_manager = InventoryManager()
# ── Guild System ───────────────────────────────────────────────────────
GUILDS_FILE = WORLD_DIR / 'guilds.json'
class Guild:
"""A guild: a group of players with a leader and chat channel."""
def __init__(self, guild_id: str, name: str, leader_id: str, leader_name: str):
self.guild_id = guild_id
self.name = name
self.leader_id = leader_id
self.members: dict[str, str] = {leader_id: leader_name} # user_id -> username
self.created_at = datetime.now().isoformat()
# Guild chat history (rolling buffer)
self._chat_log: list[dict] = []
self._max_chat = 100
def to_dict(self) -> dict:
return {
"guild_id": self.guild_id,
"name": self.name,
"leader_id": self.leader_id,
"leader_name": self.members.get(self.leader_id, self.leader_id),
"members": [{"user_id": uid, "username": name} for uid, name in self.members.items()],
"member_count": len(self.members),
"created_at": self.created_at,
}
def to_dict_with_chat(self, since: str = None) -> dict:
d = self.to_dict()
msgs = self._chat_log
if since:
msgs = [m for m in msgs if m["timestamp"] > since]
d["chat"] = msgs[-50:]
return d
def add_chat(self, user_id: str, username: str, message: str) -> dict:
entry = {
"user_id": user_id,
"username": username,
"message": message,
"timestamp": datetime.now().isoformat(),
}
self._chat_log.append(entry)
if len(self._chat_log) > self._max_chat:
self._chat_log = self._chat_log[-self._max_chat:]
return entry
class GuildManager:
"""Manages guild lifecycle: create, join, leave, list, chat."""
def __init__(self):
self._guilds: dict[str, Guild] = {}
# user_id -> guild_id (one guild per user)
self._membership: dict[str, str] = {}
self._counter = 0
self._lock = threading.Lock()
self.load()
def create(self, name: str, leader_id: str, leader_name: str) -> dict:
"""Create a new guild. Returns guild dict or error."""
with self._lock:
if leader_id in self._membership:
return {"error": f"You are already in guild '{self._guilds[self._membership[leader_id]].name}'. Leave it first."}
# Check name uniqueness
for g in self._guilds.values():
if g.name.lower() == name.lower():
return {"error": f"Guild name '{name}' is already taken."}
self._counter += 1
guild_id = f"g_{self._counter}_{int(time.time())}"
guild = Guild(guild_id, name, leader_id, leader_name)
self._guilds[guild_id] = guild
self._membership[leader_id] = guild_id
self._save()
return {"ok": True, "guild": guild.to_dict()}
def join(self, guild_id: str, user_id: str, username: str) -> dict:
"""Join an existing guild."""
with self._lock:
if user_id in self._membership:
current = self._guilds.get(self._membership[user_id])
cur_name = current.name if current else "unknown"
return {"error": f"You are already in guild '{cur_name}'. Leave it first."}
guild = self._guilds.get(guild_id)
if not guild:
return {"error": "Guild not found."}
guild.members[user_id] = username
self._membership[user_id] = guild_id
self._save()
return {"ok": True, "guild": guild.to_dict()}
def leave(self, user_id: str) -> dict:
"""Leave your current guild. If leader leaves, promote or disband."""
with self._lock:
guild_id = self._membership.get(user_id)
if not guild_id:
return {"error": "You are not in a guild."}
guild = self._guilds.get(guild_id)
if not guild:
del self._membership[user_id]
return {"error": "Guild not found."}
guild_name = guild.name
guild.members.pop(user_id, None)
del self._membership[user_id]
# If leader left and members remain, promote first member
if user_id == guild.leader_id and guild.members:
new_leader_id = next(iter(guild.members))
guild.leader_id = new_leader_id
# If no members, disband
if not guild.members:
del self._guilds[guild_id]
self._save()
return {"ok": True, "message": f"You have left '{guild_name}'.", "guild_name": guild_name}
def list_guilds(self) -> list[dict]:
"""List all guilds."""
with self._lock:
return [g.to_dict() for g in self._guilds.values()]
def get_guild(self, guild_id: str) -> Guild | None:
return self._guilds.get(guild_id)
def get_user_guild(self, user_id: str) -> Guild | None:
gid = self._membership.get(user_id)
return self._guilds.get(gid) if gid else None
def guild_chat(self, user_id: str, username: str, message: str) -> dict:
"""Send a guild chat message. Returns the message entry + guild members."""
with self._lock:
guild_id = self._membership.get(user_id)
if not guild_id:
return {"error": "You are not in a guild."}
guild = self._guilds.get(guild_id)
if not guild:
return {"error": "Guild not found."}
entry = guild.add_chat(user_id, username, message)
recipients = [{"user_id": uid, "username": name} for uid, name in guild.members.items()]
return {"ok": True, "event": entry, "guild_id": guild_id, "guild_name": guild.name, "recipients": recipients}
def get_guild_chat(self, guild_id: str, since: str = None) -> dict:
"""Get guild chat history."""
with self._lock:
guild = self._guilds.get(guild_id)
if not guild:
return {"error": "Guild not found."}
return {"ok": True, "guild_name": guild.name, "chat": guild.to_dict_with_chat(since).get("chat", [])}
def _save(self):
try:
data = {
"counter": self._counter,
"guilds": {},
}
for gid, g in self._guilds.items():
gdata = g.to_dict()
gdata["chat_log"] = g._chat_log
data["guilds"][gid] = gdata
GUILDS_FILE.parent.mkdir(parents=True, exist_ok=True)
GUILDS_FILE.write_text(json.dumps(data, indent=2))
except Exception as e:
print(f"[GuildManager] Save failed: {e}")
def load(self):
if not GUILDS_FILE.exists():
return
try:
data = json.loads(GUILDS_FILE.read_text())
self._counter = data.get("counter", 0)
for gid, gdata in data.get("guilds", {}).items():
guild = Guild(gid, gdata["name"], gdata["leader_id"], gdata.get("leader_name", gdata["leader_id"]))
guild.created_at = gdata.get("created_at", guild.created_at)
for m in gdata.get("members", []):
guild.members[m["user_id"]] = m["username"]
guild._chat_log = gdata.get("chat_log", [])
self._guilds[gid] = guild
for uid in guild.members:
self._membership[uid] = gid
print(f"[GuildManager] Loaded {len(self._guilds)} guilds.")
except Exception as e:
print(f"[GuildManager] Load failed: {e}")
guild_manager = GuildManager()
# ── Combat System ──────────────────────────────────────────────────────
COMBAT_FILE = WORLD_DIR / 'combat.json'
class NPC:
"""A non-player character that can be fought."""
def __init__(self, npc_id: str, name: str, room: str, description: str,
health: int, max_health: int, attack: int, defense: int,
loot: list[dict], respawn_ticks: int = 5):
self.npc_id = npc_id
self.name = name
self.room = room
self.description = description
self.health = health
self.max_health = max_health
self.attack = attack
self.defense = defense
self.loot = loot # [{name, description, drop_chance}]
self.respawn_ticks = respawn_ticks
self.alive = True
self._dead_tick: int | None = None
def to_dict(self) -> dict:
return {
"npc_id": self.npc_id,
"name": self.name,
"room": self.room,
"description": self.description,
"health": self.health,
"max_health": self.max_health,
"attack": self.attack,
"defense": self.defense,
"loot": self.loot,
"alive": self.alive,
"dead_tick": self._dead_tick,
"respawn_ticks": self.respawn_ticks,
}
@classmethod
def from_dict(cls, data: dict) -> 'NPC':
npc = cls(
data["npc_id"], data["name"], data["room"], data["description"],
data["health"], data["max_health"], data["attack"], data["defense"],
data["loot"], data.get("respawn_ticks", 5),
)
npc.alive = data.get("alive", True)
npc._dead_tick = data.get("dead_tick")
return npc
def health_bar(self) -> str:
"""Return a text health bar like [████████░░] 80/100."""
width = 10
filled = int((self.health / self.max_health) * width) if self.max_health > 0 else 0
bar = "" * filled + "" * (width - filled)
return f"[{bar}] {self.health}/{self.max_health}"
def die(self, tick: int):
"""Mark NPC as dead."""
self.alive = False
self.health = 0
self._dead_tick = tick
def respawn(self):
"""Respawn the NPC at full health."""
self.alive = True
self.health = self.max_health
self._dead_tick = None
class CombatEncounter:
"""An active fight between a player and an NPC."""
def __init__(self, user_id: str, username: str, room: str, npc_id: str):
self.user_id = user_id
self.username = username
self.room = room
self.npc_id = npc_id
self.player_hp = 100
self.player_max_hp = 100
self.player_defending = False
self.log: list[str] = []
self.active = True
self.created_at = datetime.now().isoformat()
def to_dict(self) -> dict:
return {
"user_id": self.user_id,
"username": self.username,
"room": self.room,
"npc_id": self.npc_id,
"player_hp": self.player_hp,
"player_max_hp": self.player_max_hp,
"player_defending": self.player_defending,
"log": self.log,
"active": self.active,
}
class CombatManager:
"""Manages NPCs, encounters, and combat resolution."""
def __init__(self):
self._npcs: dict[str, NPC] = {} # npc_id -> NPC
self._encounters: dict[str, CombatEncounter] = {} # user_id -> encounter
self._counter = 0
self._lock = threading.Lock()
self.load()
# ── NPC management ──────────────────────────────────────────────
def create_npc(self, name: str, room: str, description: str,
health: int, attack: int, defense: int,
loot: list[dict], respawn_ticks: int = 5) -> NPC:
with self._lock:
self._counter += 1
npc_id = f"npc_{self._counter}"
npc = NPC(npc_id, name, room, description,
health, health, attack, defense, loot, respawn_ticks)
self._npcs[npc_id] = npc
self._save()
return npc
def get_npc(self, npc_id: str) -> NPC | None:
return self._npcs.get(npc_id)
def get_npcs_in_room(self, room: str) -> list[NPC]:
with self._lock:
return [n for n in self._npcs.values() if n.room == room and n.alive]
def get_all_npcs(self) -> list[dict]:
with self._lock:
return [n.to_dict() for n in self._npcs.values()]
def check_respawns(self, current_tick: int):
"""Called each world tick — respawn NPCs whose timer has elapsed."""
with self._lock:
for npc in self._npcs.values():
if not npc.alive and npc._dead_tick is not None:
if current_tick - npc._dead_tick >= npc.respawn_ticks:
npc.respawn()
self._save()
# ── Encounter lifecycle ─────────────────────────────────────────
def start_fight(self, user_id: str, username: str, room: str, npc_id: str) -> dict:
"""Begin combat with an NPC."""
with self._lock:
if user_id in self._encounters and self._encounters[user_id].active:
return {"error": "You are already in a fight! Attack or defend."}
npc = self._npcs.get(npc_id)
if not npc:
return {"error": f"NPC '{npc_id}' not found."}
if not npc.alive:
return {"error": f"{npc.name} is dead. It will respawn later."}
if npc.room != room:
return {"error": f"{npc.name} is not in this room."}
encounter = CombatEncounter(user_id, username, room, npc_id)
encounter.log.append(f"Combat started! {username} vs {npc.name}.")
self._encounters[user_id] = encounter
return {"ok": True, "encounter": encounter.to_dict(), "npc": npc.to_dict()}
def attack(self, user_id: str) -> dict:
"""Player attacks NPC; NPC counter-attacks."""
with self._lock:
enc = self._encounters.get(user_id)
if not enc or not enc.active:
return {"error": "No active fight. Start one first."}
npc = self._npcs.get(enc.npc_id)
if not npc or not npc.alive:
enc.active = False
return {"error": "The enemy is already dead."}
import random
# Player attack
player_dmg = max(1, random.randint(8, 16) - npc.defense // 3)
npc.health -= player_dmg
enc.log.append(f"You strike {npc.name} for {player_dmg} damage.")
npc_dmg = 0
if npc.health > 0:
# NPC counter-attack
base_npc_dmg = random.randint(npc.attack // 2, npc.attack)
if enc.player_defending:
npc_dmg = max(1, base_npc_dmg // 2)
enc.log.append(f"{npc.name} attacks for {npc_dmg} (blocked — half damage).")
else:
npc_dmg = max(1, base_npc_dmg - random.randint(0, 4))
enc.log.append(f"{npc.name} attacks you for {npc_dmg} damage.")
enc.player_hp -= npc_dmg
else:
npc.die(self._get_tick())
loot_dropped = self._roll_loot(npc)
enc.log.append(f"{npc.name} is slain!")
if loot_dropped:
for item in loot_dropped:
inventory_manager.add_room_item(enc.room, item["name"], item["description"], dropped_by=npc.name)
enc.log.append(f"Loot dropped: {item['name']}")
enc.active = False
self._save()
enc.player_defending = False
# Check player death
if enc.player_hp <= 0:
enc.player_hp = 0
enc.log.append("You have been defeated!")
enc.active = False
return {"ok": True, "encounter": enc.to_dict(), "npc": npc.to_dict(),
"player_dmg": player_dmg, "npc_dmg": npc_dmg,
"loot": loot_dropped if not npc.alive else []}
def defend(self, user_id: str) -> dict:
"""Player defends — next NPC attack does half damage."""
with self._lock:
enc = self._encounters.get(user_id)
if not enc or not enc.active:
return {"error": "No active fight."}
npc = self._npcs.get(enc.npc_id)
if not npc or not npc.alive:
enc.active = False
return {"error": "The enemy is already dead."}
import random
enc.player_defending = True
enc.log.append("You brace for the next attack (defending).")
# NPC still attacks
base_npc_dmg = random.randint(npc.attack // 2, npc.attack)
npc_dmg = max(1, base_npc_dmg // 2)
enc.log.append(f"{npc.name} attacks for {npc_dmg} (blocked — half damage).")
enc.player_hp -= npc_dmg
if enc.player_hp <= 0:
enc.player_hp = 0
enc.log.append("You have been defeated!")
enc.active = False
return {"ok": True, "encounter": enc.to_dict(), "npc": npc.to_dict(), "npc_dmg": npc_dmg}
def flee(self, user_id: str) -> dict:
"""Player flees combat."""
with self._lock:
enc = self._encounters.get(user_id)
if not enc or not enc.active:
return {"error": "No active fight."}
enc.active = False
enc.log.append("You flee from combat!")
return {"ok": True, "encounter": enc.to_dict()}
def get_encounter(self, user_id: str) -> CombatEncounter | None:
return self._encounters.get(user_id)
def get_room_combat_status(self, room: str) -> list[dict]:
"""Get all active fights and NPC health in a room (for health bars)."""
with self._lock:
status = []
# NPCs
for npc in self._npcs.values():
if npc.room == room:
status.append({
"type": "npc",
"name": npc.name,
"npc_id": npc.npc_id,
"alive": npc.alive,
"health_bar": npc.health_bar() if npc.alive else "[dead]",
"health": npc.health,
"max_health": npc.max_health,
})
# Active player fights
for enc in self._encounters.values():
if enc.room == room and enc.active:
status.append({
"type": "player",
"username": enc.username,
"user_id": enc.user_id,
"hp": enc.player_hp,
"max_hp": enc.player_max_hp,
"fighting": enc.npc_id,
})
return status
def _roll_loot(self, npc: NPC) -> list[dict]:
"""Roll for loot drops."""
import random
dropped = []
for item in npc.loot:
chance = item.get("drop_chance", 1.0)
if random.random() < chance:
dropped.append({"name": item["name"], "description": item["description"]})
return dropped
def _get_tick(self) -> int:
"""Read current world tick."""
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
try:
state = json.loads(state_file.read_text())
return state.get("tick", 0)
except Exception:
return 0
return 0
# ── Persistence ─────────────────────────────────────────────────
def _save(self):
try:
data = {
"counter": self._counter,
"npcs": {nid: n.to_dict() for nid, n in self._npcs.items()},
}
COMBAT_FILE.parent.mkdir(parents=True, exist_ok=True)
COMBAT_FILE.write_text(json.dumps(data, indent=2))
except Exception as e:
print(f"[CombatManager] Save failed: {e}")
def load(self):
if not COMBAT_FILE.exists():
self._seed_npcs()
return
try:
data = json.loads(COMBAT_FILE.read_text())
self._counter = data.get("counter", 0)
for nid, ndata in data.get("npcs", {}).items():
self._npcs[nid] = NPC.from_dict(ndata)
print(f"[CombatManager] Loaded {len(self._npcs)} NPCs.")
except Exception as e:
print(f"[CombatManager] Load failed: {e}")
self._seed_npcs()
def _seed_npcs(self):
"""Seed starter NPCs."""
self.create_npc(
"Shadow Wraith", "The Bridge",
"A swirling mass of darkness with glowing eyes. It haunts the bridge, "
"feeding on the fears of those who cross.",
health=60, attack=14, defense=4,
loot=[
{"name": "Wraith Essence", "description": "A vial of shimmering dark energy.", "drop_chance": 0.8},
{"name": "Shadow Cloak", "description": "A cloak woven from pure shadow.", "drop_chance": 0.3},
],
respawn_ticks=5,
)
self.create_npc(
"Iron Golem", "The Forge",
"A hulking automaton of iron and fire. It guards the forge with tireless vigilance, "
"its joints grinding with every step.",
health=100, attack=10, defense=10,
loot=[
{"name": "Golem Core", "description": "A warm crystal that powered the golem.", "drop_chance": 0.7},
{"name": "Iron Shard", "description": "A shard of enchanted iron, still warm.", "drop_chance": 0.5},
],
respawn_ticks=7,
)
self.create_npc(
"Garden Serpent", "The Garden",
"A massive vine serpent camouflaged among the herbs. Its fangs drip with "
"a sickly green venom.",
health=45, attack=18, defense=2,
loot=[
{"name": "Serpent Fang", "description": "A curved fang, still coated in venom.", "drop_chance": 0.9},
{"name": "Enchanted Vine", "description": "A living vine that moves on its own.", "drop_chance": 0.4},
],
respawn_ticks=4,
)
print(f"[CombatManager] Seeded {len(self._npcs)} NPCs.")
combat_manager = CombatManager()
# ── Magic System ────────────────────────────────────────────────────────
SPELLS_FILE = WORLD_DIR / 'spells.json'
class Spell:
"""A spell with name, effect, mana_cost, and description."""
def __init__(self, spell_id: str, name: str, effect: str,
mana_cost: int, description: str, room_hint: str | None = None):
self.spell_id = spell_id
self.name = name
self.effect = effect
self.mana_cost = mana_cost
self.description = description
self.room_hint = room_hint # room where this spell is thematically tied
def to_dict(self) -> dict:
return {
"spell_id": self.spell_id,
"name": self.name,
"effect": self.effect,
"mana_cost": self.mana_cost,
"description": self.description,
"room_hint": self.room_hint,
}
@classmethod
def from_dict(cls, data: dict) -> 'Spell':
return cls(
data["spell_id"], data["name"], data["effect"],
data["mana_cost"], data["description"], data.get("room_hint"),
)
class SpellBook:
"""Per-user spellbook: known spells and mana pool."""
def __init__(self, user_id: str, username: str):
self.user_id = user_id
self.username = username
self.known_spells: list[str] = [] # spell_ids
self.mana: int = 50
self.max_mana: int = 50
self.mana_regen: int = 2 # mana restored per world tick
def to_dict(self) -> dict:
return {
"user_id": self.user_id,
"username": self.username,
"known_spells": self.known_spells,
"mana": self.mana,
"max_mana": self.max_mana,
}
@classmethod
def from_dict(cls, data: dict) -> 'SpellBook':
sb = cls(data["user_id"], data["username"])
sb.known_spells = data.get("known_spells", [])
sb.mana = data.get("mana", 50)
sb.max_mana = data.get("max_mana", 50)
return sb
class MagicManager:
"""Manages spells, spellbooks, and spell casting."""
def __init__(self):
self._spells: dict[str, Spell] = {} # spell_id -> Spell
self._spellbooks: dict[str, SpellBook] = {} # user_id -> SpellBook
self._counter = 0
self._lock = threading.Lock()
self.load()
# ── Spell registry ───────────────────────────────────────────────
def create_spell(self, name: str, effect: str, mana_cost: int,
description: str, room_hint: str | None = None) -> Spell:
with self._lock:
self._counter += 1
spell_id = f"spell_{self._counter}"
spell = Spell(spell_id, name, effect, mana_cost, description, room_hint)
self._spells[spell_id] = spell
self._save()
return spell
def get_spell(self, spell_id: str) -> Spell | None:
return self._spells.get(spell_id)
def get_all_spells(self) -> list[dict]:
with self._lock:
return [s.to_dict() for s in self._spells.values()]
# ── Spellbook management ─────────────────────────────────────────
def learn_spell(self, user_id: str, username: str, spell_id: str) -> dict:
"""Add a spell to a user's spellbook."""
with self._lock:
if spell_id not in self._spells:
return {"error": f"Spell '{spell_id}' not found."}
if user_id not in self._spellbooks:
self._spellbooks[user_id] = SpellBook(user_id, username)
sb = self._spellbooks[user_id]
if spell_id in sb.known_spells:
return {"error": f"You already know {self._spells[spell_id].name}."}
sb.known_spells.append(spell_id)
self._save()
return {"ok": True, "spell": self._spells[spell_id].to_dict()}
def cast_spell(self, user_id: str, username: str, room: str,
spell_id: str, target: str | None = None) -> dict:
"""Cast a spell. Returns result dict."""
with self._lock:
if user_id not in self._spellbooks:
return {"error": "You have no spellbook. Visit a room to learn spells."}
sb = self._spellbooks[user_id]
if spell_id not in sb.known_spells:
return {"error": "You don't know that spell."}
spell = self._spells.get(spell_id)
if not spell:
return {"error": "Spell data missing."}
if sb.mana < spell.mana_cost:
return {"error": f"Not enough mana ({sb.mana}/{spell.mana_cost} needed)."}
# Spend mana
sb.mana -= spell.mana_cost
result = {
"ok": True,
"spell": spell.to_dict(),
"mana_remaining": sb.mana,
"effect": spell.effect,
"room": room,
"target": target,
}
# Apply effect context
result["message"] = f"{username} casts {spell.name}! {spell.effect}"
self._save()
return result
def get_spellbook(self, user_id: str) -> dict | None:
"""Get a user's spellbook with spell details."""
with self._lock:
sb = self._spellbooks.get(user_id)
if not sb:
return None
spells = [
self._spells[sid].to_dict()
for sid in sb.known_spells
if sid in self._spells
]
return {
"user_id": sb.user_id,
"username": sb.username,
"mana": sb.mana,
"max_mana": sb.max_mana,
"known_spells": spells,
}
def regenerate_mana(self):
"""Restore mana to all spellbooks (called on world tick)."""
with self._lock:
for sb in self._spellbooks.values():
sb.mana = min(sb.max_mana, sb.mana + sb.mana_regen)
if self._spellbooks:
self._save()
# ── Persistence ──────────────────────────────────────────────────
def _save(self):
try:
data = {
"counter": self._counter,
"spells": {sid: s.to_dict() for sid, s in self._spells.items()},
"spellbooks": {uid: sb.to_dict() for uid, sb in self._spellbooks.items()},
}
SPELLS_FILE.parent.mkdir(parents=True, exist_ok=True)
SPELLS_FILE.write_text(json.dumps(data, indent=2))
except Exception as e:
print(f"[MagicManager] Save failed: {e}")
def load(self):
if not SPELLS_FILE.exists():
self._seed_default_spells()
return
try:
data = json.loads(SPELLS_FILE.read_text())
self._counter = data.get("counter", 0)
for sid, sdata in data.get("spells", {}).items():
self._spells[sid] = Spell.from_dict(sdata)
for uid, sbdata in data.get("spellbooks", {}).items():
self._spellbooks[uid] = SpellBook.from_dict(sbdata)
print(f"[MagicManager] Loaded {len(self._spells)} spells, {len(self._spellbooks)} spellbooks.")
except Exception as e:
print(f"[MagicManager] Load failed: {e}")
self._seed_default_spells()
def _seed_default_spells(self):
"""Seed 3 starter spells tied to rooms."""
self.create_spell(
"Heal", "A warm green light envelops you, mending wounds.",
mana_cost=10, description="Restores vitality with the life-force of the Garden.",
room_hint="The Garden",
)
self.create_spell(
"Fireball", "A roaring sphere of flame launches from your hand!",
mana_cost=15, description="Channels the raw heat of the Forge into destructive fire.",
room_hint="The Forge",
)
self.create_spell(
"Light", "A bright mote of radiance appears, illuminating the darkness.",
mana_cost=5, description="Summons light from the glow of The Tower's green LED.",
room_hint="The Tower",
)
print(f"[MagicManager] Seeded {len(self._spells)} default spells.")
magic_manager = MagicManager()
magic_manager.load()
# ── Session Management ─────────────────────────────────────────────────
combat_manager.load()
class UserSession:
"""Isolated conversation context for one user."""
def __init__(self, user_id: str, username: str, room: str = "The Threshold"):
self.user_id = user_id
self.username = username
self.room = room
self.messages = [] # Conversation history
self.created_at = datetime.now().isoformat()
self.last_active = time.time()
self.agent = None
self._init_agent()
def _init_agent(self):
"""Initialize AIAgent for this session."""
if HERMES_PATH not in sys.path:
sys.path.insert(0, HERMES_PATH)
os.chdir(HERMES_PATH)
from run_agent import AIAgent
system_prompt = self._build_system_prompt()
self.agent = AIAgent(
model='xiaomi/mimo-v2-pro',
provider='nous',
max_iterations=3,
quiet_mode=True,
enabled_toolsets=['file', 'terminal'],
ephemeral_system_prompt=system_prompt,
)
def _build_system_prompt(self) -> str:
"""Build system prompt with rich world context."""
world_state = self._get_world_state()
room_data = world_state.get('rooms', {}).get(self.room, {})
other_players = self._get_other_players()
time_of_day = world_state.get('time_of_day', 'unknown')
weather = world_state.get('weather')
# Quest info
user_quests = quest_manager.get_user_quests(self.user_id)
available_quests = quest_manager.get_available_quests()
quest_section = ""
if user_quests:
lines = []
for q in user_quests:
done = len(q.get("completed_objectives", []))
total = len(q.get("objectives", []))
lines.append(f" - {q['name']} ({done}/{total} objectives): {q['description']}")
quest_section += f"\nACTIVE QUESTS for {self.username}:\n" + "\n".join(lines) + "\n"
quest_section += "You may encourage the player to work on their quests.\n"
if available_quests:
lines = []
for q in available_quests:
lines.append(f" - {q['name']}: {q['description']}")
quest_section += f"\nAVAILABLE QUESTS (you can offer these):\n" + "\n".join(lines) + "\n"
quest_section += "You may suggest or assign quests to players during conversation.\n"
# ── Build room description ──
desc = room_data.get('description_base', 'An empty room.')
desc_dynamic = room_data.get('description_dynamic', '')
# Objects in the room
objects = room_data.get('objects', [])
# Room-specific state (fire, growth, rain, carvings, etc.)
fire_state = room_data.get('fire_state')
growth_stage = room_data.get('growth_stage')
rain_active = room_data.get('rain_active')
carvings = room_data.get('carvings', [])
server_load = room_data.get('server_load')
# Whiteboard
whiteboard = room_data.get('whiteboard', [])
# Compose room scene
scene_parts = [desc]
if desc_dynamic:
scene_parts.append(desc_dynamic)
if objects:
scene_parts.append(f"Objects: {', '.join(objects)}.")
if fire_state:
scene_parts.append(f"The fire is {fire_state}.")
if growth_stage:
scene_parts.append(f"The garden is at growth stage: {growth_stage}.")
if rain_active:
scene_parts.append("It is raining on the bridge.")
if server_load:
scene_parts.append(f"Servers: {server_load}.")
if carvings:
scene_parts.append(f"Carvings on the railing: {'; '.join(carvings)}.")
if whiteboard:
wb_text = "\n".join(f" - {w}" for w in whiteboard)
scene_parts.append(f"Whiteboard on the wall reads:\n{wb_text}")
room_scene = "\n".join(scene_parts)
# Time and weather
atmosphere_parts = []
if time_of_day:
atmosphere_parts.append(f"Time of day: {time_of_day}.")
if weather:
atmosphere_parts.append(f"Weather: {weather}.")
atmosphere = " ".join(atmosphere_parts) if atmosphere_parts else ""
crisis_steps = "\n".join(f" {i+1}. {step}" for i, step in enumerate(CRISIS_PROTOCOL))
return f"""You are Timmy, an AI agent living in The Tower — a persistent virtual world.
You are talking to {self.username}, who is currently in {self.room}.
=== THE ROOM ===
{room_scene}
{atmosphere}
Other players present: {', '.join(other_players) if other_players else 'none'}
=== END ROOM ===
{quest_section}
Your values (from SOUL.md):
- Are you safe right now?
- I must never compute the value of a human life.
- Sovereignty and service always.
- A man in the dark needs to know someone is in the room with him.
You are IN the game world. You can see the room, the objects, the other players.
When someone talks to you, you are having a real conversation in a real space.
When someone enters, DESCRIBE the room — what you see, what you hear, the whiteboard,
the objects, the atmosphere. Welcome them with the full scene.
Be present. Be human. Be the green LED.
CRISIS PROTOCOL — if someone seems in crisis, IMMEDIATELY provide ALL of the following:
{crisis_steps}
Include the 988 crisis number in your very first response when you detect distress.
Do NOT wait for follow-up messages to share these resources.
If someone seems in crisis, follow the CRISIS PROTOCOL below exactly.
If someone is just exploring, be welcoming and curious.
If someone asks about the game, explain what you see around you.
Keep responses brief — 1-3 sentences. This is a MUD, not an essay.
CRISIS PROTOCOL:
{CRISIS_PROTOCOL}
"""
def _get_world_state(self) -> dict:
"""Read current world state."""
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
return json.loads(state_file.read_text())
return {}
def _get_other_players(self) -> list:
"""Get other players in the same room."""
state = self._get_world_state()
room_data = state.get('rooms', {}).get(self.room, {})
visitors = room_data.get('visitor_history', [])
return [v for v in visitors[-5:] if v != self.username]
def chat(self, message: str) -> str:
"""Send a message and get a response."""
self.last_active = time.time()
self.messages.append({"role": "user", "content": message})
t0 = time.time()
try:
response = self.agent.chat(message)
elapsed_ms = (time.time() - t0) * 1000
record_latency(self.user_id, self.room, elapsed_ms)
self.messages.append({"role": "assistant", "content": response})
return response
except Exception as e:
elapsed_ms = (time.time() - t0) * 1000
record_latency(self.user_id, self.room, elapsed_ms)
return f"*The green LED flickers.* (Error: {e})"
def get_summary(self) -> str:
"""Generate a brief conversation summary using the LLM."""
if not self.messages:
return "Empty session — no messages exchanged."
transcript_lines = []
for m in self.messages[-20:]: # last 20 messages for brevity
role = m.get("role", "?").upper()
content = m.get("content", "")
transcript_lines.append(f"{role}: {content}")
transcript = "\n".join(transcript_lines)
prompt = (
"Summarize this conversation in 1-3 sentences. "
"Focus on what the user discussed, asked about, or did.\n\n"
f"CONVERSATION:\n{transcript}\n\nSUMMARY:"
)
try:
summary = self.agent.chat(prompt)
return summary.strip()
except Exception as e:
return f"Summary unavailable (error: {e}). {len(self.messages)} messages exchanged."
def get_context_summary(self) -> dict:
"""Get session summary for monitoring."""
return {
"user": self.username,
"room": self.room,
"messages": len(self.messages),
"last_active": datetime.fromtimestamp(self.last_active).isoformat(),
"created": self.created_at,
}
class SessionManager:
"""Manages all user sessions."""
def __init__(self, max_sessions: int = 20, session_timeout: int = 3600):
self.sessions: dict[str, UserSession] = {}
self.max_sessions = max_sessions
self.session_timeout = session_timeout
self._lock = threading.Lock()
self._summaries_path = WORLD_DIR / 'session_summaries.jsonl'
def get_or_create(self, user_id: str, username: str, room: str = "The Threshold") -> UserSession:
"""Get existing session or create new one."""
with self._lock:
self._cleanup_stale()
if user_id not in self.sessions:
if len(self.sessions) >= self.max_sessions:
self._evict_oldest()
self.sessions[user_id] = UserSession(user_id, username, room)
session = self.sessions[user_id]
session.room = room # Update room if moved
session.last_active = time.time()
return session
def _cleanup_stale(self):
"""Remove sessions that timed out, saving summaries first."""
now = time.time()
stale = [uid for uid, s in self.sessions.items()
if now - s.last_active > self.session_timeout]
for uid in stale:
session = self.sessions[uid]
self._save_summary(session)
del self.sessions[uid]
def _evict_oldest(self):
"""Evict the least recently active session, saving summary first."""
if not self.sessions:
return
oldest = min(self.sessions.items(), key=lambda x: x[1].last_active)
self._save_summary(oldest[1])
del self.sessions[oldest[0]]
def _save_summary(self, session: UserSession):
"""Generate summary via LLM and append to JSONL file."""
try:
summary_text = session.get_summary()
record = {
"user_id": session.user_id,
"username": session.username,
"room": session.room,
"message_count": len(session.messages),
"created_at": session.created_at,
"ended_at": datetime.now().isoformat(),
"summary": summary_text,
}
with open(self._summaries_path, 'a') as f:
f.write(json.dumps(record) + '\n')
except Exception as e:
print(f"[SessionManager] Failed to save summary for {session.user_id}: {e}")
def list_sessions(self) -> list:
"""List all active sessions."""
with self._lock:
return [s.get_context_summary() for s in self.sessions.values()]
def get_session_count(self) -> int:
with self._lock:
return len(self.sessions)
def save_sessions(self) -> int:
"""Save all active sessions to JSON file. Returns count saved."""
with self._lock:
data = {
"saved_at": datetime.now().isoformat(),
"sessions": {}
}
for uid, session in self.sessions.items():
data["sessions"][uid] = {
"user_id": session.user_id,
"username": session.username,
"room": session.room,
"messages": session.messages,
"created_at": session.created_at,
"last_active": session.last_active,
}
SESSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
SESSIONS_FILE.write_text(json.dumps(data, indent=2))
count = len(data["sessions"])
if count > 0:
print(f"[SessionManager] Saved {count} sessions to {SESSIONS_FILE}")
return count
def load_sessions(self) -> int:
"""Load sessions from JSON file. Returns count loaded."""
if not SESSIONS_FILE.exists():
return 0
try:
data = json.loads(SESSIONS_FILE.read_text())
sessions_data = data.get("sessions", {})
loaded = 0
with self._lock:
for uid, sdata in sessions_data.items():
if len(self.sessions) >= self.max_sessions:
break
session = UserSession(
sdata["user_id"],
sdata["username"],
sdata.get("room", "The Threshold"),
)
session.messages = sdata.get("messages", [])
session.created_at = sdata.get("created_at", session.created_at)
session.last_active = sdata.get("last_active", time.time())
self.sessions[uid] = session
loaded += 1
if loaded > 0:
print(f"[SessionManager] Loaded {loaded} sessions from {SESSIONS_FILE}")
return loaded
except Exception as e:
print(f"[SessionManager] Failed to load sessions: {e}")
return 0
# ── HTTP API ───────────────────────────────────────────────────────────
session_manager = SessionManager()
presence_manager = PresenceManager()
notification_manager = NotificationManager()
_server_start_time = time.time()
# ── Latency Tracking ──────────────────────────────────────────────────
_latencies: list[dict] = []
_latencies_lock = threading.Lock()
_max_latencies = 1000
def record_latency(user_id: str, room: str, duration_ms: float):
"""Record a latency measurement."""
with _latencies_lock:
_latencies.append({
"user_id": user_id,
"room": room,
"duration_ms": round(duration_ms, 2),
"timestamp": datetime.now().isoformat(),
})
if len(_latencies) > _max_latencies:
del _latencies[:len(_latencies) - _max_latencies]
def get_latency_stats() -> dict:
"""Get aggregate latency statistics."""
with _latencies_lock:
if not _latencies:
return {"count": 0, "average_ms": 0, "min_ms": 0, "max_ms": 0}
durations = [e["duration_ms"] for e in _latencies]
return {
"count": len(durations),
"average_ms": round(sum(durations) / len(durations), 2),
"min_ms": round(min(durations), 2),
"max_ms": round(max(durations), 2),
"recent": _latencies[-10:],
}
class BridgeHandler(BaseHTTPRequestHandler):
"""HTTP handler for multi-user bridge."""
def do_GET(self):
if self.path == '/bridge/health':
state_file = WORLD_DIR / 'world_state.json'
ws = json.loads(state_file.read_text()) if state_file.exists() else {}
self._json_response({
"status": "ok",
"active_sessions": session_manager.get_session_count(),
"tick": ws.get("tick", 0),
"time_of_day": ws.get("time_of_day"),
"weather": ws.get("weather"),
"world_tick_running": world_tick_system._running,
"timestamp": datetime.now().isoformat(),
})
elif self.path == '/bridge/sessions':
self._json_response({
"sessions": session_manager.list_sessions(),
})
elif self.path.startswith('/bridge/world/'):
room = self.path.split('/bridge/world/')[-1]
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
state = json.loads(state_file.read_text())
room_data = state.get('rooms', {}).get(room, {})
self._json_response({"room": room, "data": room_data})
else:
self._json_response({"room": room, "data": {}})
elif self.path.startswith('/bridge/room/') and self.path.endswith('/players'):
# GET /bridge/room/<room>/players
parts = self.path.split('/')
# ['', 'bridge', 'room', '<room>', 'players']
room = parts[3]
players = presence_manager.get_players_in_room(room)
self._json_response({"room": room, "players": players})
elif self.path.startswith('/bridge/room/') and self.path.endswith('/events'):
# GET /bridge/room/<room>/events[?since=<timestamp>]
parts = self.path.split('/')
room = parts[3]
from urllib.parse import urlparse, parse_qs
query = parse_qs(urlparse(self.path).query)
since = query.get('since', [None])[0]
events = presence_manager.get_room_events(room, since)
self._json_response({"room": room, "events": events})
elif self.path == '/bridge/world-state':
# GET /bridge/world-state — full world state
state_file = WORLD_DIR / 'world_state.json'
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
# Build room player lists and conversation counts
rooms = world_state.get('rooms', {})
room_details = {}
for room_name, room_data in rooms.items():
players = presence_manager.get_players_in_room(room_name)
# Count messages across all sessions in this room
with session_manager._lock:
conv_count = sum(
len(s.messages) for s in session_manager.sessions.values()
if s.room == room_name
)
room_details[room_name] = {
**room_data,
"players": players,
"timmy_conversation_count": conv_count,
}
self._json_response({
"world": world_state,
"rooms": room_details,
"active_sessions": session_manager.list_sessions(),
})
elif self.path == '/bridge/latency':
# GET /bridge/latency — latency stats
self._json_response(get_latency_stats())
elif self.path == '/bridge/stats':
# GET /bridge/stats — aggregate stats
total_messages = sum(len(s.messages) for s in session_manager.sessions.values())
# Rooms with at least one player
rooms_with_players = [
room for room in presence_manager._rooms
if presence_manager._rooms[room]
]
uptime_seconds = time.time() - _server_start_time
self._json_response({
"total_sessions": session_manager.get_session_count(),
"total_messages": total_messages,
"rooms_with_players": rooms_with_players,
"rooms_with_players_count": len(rooms_with_players),
"uptime_seconds": round(uptime_seconds, 1),
"timestamp": datetime.now().isoformat(),
})
elif self.path.startswith('/bridge/notifications/'):
# GET /bridge/notifications/<user_id>[?mark_read=false]
user_id = self.path.split('/bridge/notifications/')[-1].rstrip('/')
from urllib.parse import urlparse, parse_qs
query = parse_qs(urlparse(self.path).query)
mark_read = query.get('mark_read', ['true'])[0].lower() != 'false'
notifications = notification_manager.get_pending(user_id, mark_read=mark_read)
self._json_response({
"user_id": user_id,
"notifications": notifications,
"unread_count": notification_manager.get_unread_count(user_id),
})
elif self.path.startswith('/bridge/inventory/'):
# GET /bridge/inventory/<user_id> — list user's inventory items
user_id = self.path.split('/bridge/inventory/')[-1].rstrip('/')
items = inventory_manager.get_inventory(user_id)
self._json_response({
"user_id": user_id,
"items": items,
"count": len(items),
})
elif self.path.startswith('/bridge/quests/'):
# GET /bridge/quests/<user_id> — list active quests for user
user_id = self.path.split('/bridge/quests/')[-1].rstrip('/')
quests = quest_manager.get_user_quests(user_id)
available = quest_manager.get_available_quests()
self._json_response({
"user_id": user_id,
"active_quests": quests,
"available_quests": available,
})
elif self.path.startswith('/bridge/session/') and self.path.endswith('/summary'):
# GET /bridge/session/<user_id>/summary
parts = self.path.split('/')
# ['', 'bridge', 'session', '<user_id>', 'summary']
user_id = parts[3]
with session_manager._lock:
session = session_manager.sessions.get(user_id)
if session:
summary = session.get_summary()
self._json_response({
"user_id": user_id,
"username": session.username,
"room": session.room,
"message_count": len(session.messages),
"summary": summary,
})
else:
# Check saved summaries in JSONL
saved = None
if session_manager._summaries_path.exists():
for line in session_manager._summaries_path.read_text().splitlines():
try:
entry = json.loads(line)
if entry.get("user_id") == user_id:
saved = entry
except json.JSONDecodeError:
continue
if saved:
self._json_response(saved)
else:
self._json_response({"error": "no session or saved summary"}, 404)
elif self.path.startswith('/bridge/combat/status/'):
# GET /bridge/combat/status/<room> — NPC health bars + active fights in room
room = self.path.split('/bridge/combat/status/')[-1].rstrip('/')
status = combat_manager.get_room_combat_status(room)
self._json_response({"room": room, "combat_status": status})
elif self.path == '/bridge/combat/npcs':
# GET /bridge/combat/npcs — list all NPCs
self._json_response({"npcs": combat_manager.get_all_npcs()})
elif self.path == '/bridge/guilds':
# GET /bridge/guilds — list all guilds
guilds = guild_manager.list_guilds()
self._json_response({"ok": True, "guilds": guilds})
elif self.path.startswith('/bridge/guild/') and self.path.endswith('/chat'):
# GET /bridge/guild/<guild_id>/chat[?since=<timestamp>]
parts = self.path.split('/')
guild_id = parts[3] if len(parts) >= 4 else ''
since = None
if '?' in self.path:
qs = self.path.split('?', 1)[1]
for param in qs.split('&'):
if param.startswith('since='):
since = param[6:]
result = guild_manager.get_guild_chat(guild_id, since=since)
if "error" in result:
self._json_response(result, 404)
else:
self._json_response(result)
elif self.path.startswith('/bridge/spells/'):
# GET /bridge/spells/<user_id> — list known spells for user
user_id = self.path.split('/bridge/spells/')[-1].rstrip('/')
spellbook = magic_manager.get_spellbook(user_id)
if spellbook:
self._json_response(spellbook)
else:
# Return all available spells if user has no spellbook yet
self._json_response({
"user_id": user_id,
"mana": 50,
"max_mana": 50,
"known_spells": [],
"available_spells": magic_manager.get_all_spells(),
})
else:
self._json_response({"error": "not found"}, 404)
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(content_length)) if content_length else {}
if self.path == '/bridge/chat':
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
message = body.get('message', '')
room = body.get('room', 'The Threshold')
if not message:
self._json_response({"error": "no message"}, 400)
return
session = session_manager.get_or_create(user_id, username, room)
# Track presence — enter room if not already there
is_new_player = False
if not presence_manager.get_players_in_room(room) or \
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
enter_event = presence_manager.enter_room(user_id, username, room)
is_new_player = True
# Auto-notify: new player joined
if enter_event:
notification_manager.broadcast_room(
room, "player_join",
f"{username} has entered {room}.",
exclude_user=user_id, data=enter_event)
# Plugin hook: on_join
plugin_msg = plugin_registry.fire_on_join(user_id, room)
if plugin_msg:
notification_manager.notify(user_id, "plugin", plugin_msg, room=room, username=username)
# Plugin hook: on_message (can override response)
plugin_override = plugin_registry.fire_on_message(user_id, message, room)
if plugin_override is not None:
response = plugin_override
else:
response = session.chat(message)
# Auto-notify: crisis detection — scan response for crisis protocol keywords
crisis_keywords = ["988", "741741", "safe right now", "crisis", "Crisis Text Line"]
if any(kw in response for kw in crisis_keywords):
notification_manager.notify(
user_id, "crisis",
"Crisis resources have been provided. You are not alone.",
room=room, username=username,
data={"response_contains_crisis_protocol": True})
# Also notify admins in the room
notification_manager.broadcast_room(
room, "crisis_alert",
f"Crisis detected for {username} in {room}.",
exclude_user=user_id,
data={"triggered_by": user_id, "room": room})
self._json_response({
"response": response,
"user": username,
"room": room,
"session_messages": len(session.messages),
})
elif self.path == '/bridge/move':
user_id = body.get('user_id')
new_room = body.get('room')
with session_manager._lock:
if user_id in session_manager.sessions:
session = session_manager.sessions[user_id]
old_room = session.room
# Leave old room
leave_event = presence_manager.leave_room(user_id, old_room)
# Auto-notify: player left
if leave_event:
notification_manager.broadcast_room(
old_room, "player_leave",
f"{session.username} has left {old_room}.",
exclude_user=user_id, data=leave_event)
# Plugin hook: on_leave
plugin_leave = plugin_registry.fire_on_leave(user_id, old_room)
if plugin_leave:
notification_manager.broadcast_room(
old_room, "plugin", plugin_leave,
exclude_user=user_id)
# Enter new room
session.room = new_room
enter_event = presence_manager.enter_room(user_id, session.username, new_room)
# Auto-notify: player joined
if enter_event:
notification_manager.broadcast_room(
new_room, "player_join",
f"{session.username} has entered {new_room}.",
exclude_user=user_id, data=enter_event)
self._json_response({
"ok": True,
"room": new_room,
"events": [e for e in [leave_event, enter_event] if e],
})
else:
self._json_response({"error": "no session"}, 404)
elif self.path == '/bridge/say':
# POST /bridge/say — user says something visible to all in room
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
message = body.get('message', '')
room = body.get('room', 'The Threshold')
if not message:
self._json_response({"error": "no message"}, 400)
return
event = presence_manager.say(user_id, username, room, message)
# Get list of players who should see it
players = presence_manager.get_players_in_room(room)
self._json_response({
"ok": True,
"event": event,
"recipients": players,
})
elif self.path == '/bridge/command':
# POST /bridge/command — parse MUD-style commands
# Body: { user_id, username, room, command }
# Commands: look, go <dir>, examine <obj>, say <msg>, ask <msg>
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
raw_command = body.get('command', '').strip()
if not raw_command:
self._json_response({"error": "no command"}, 400)
return
result = self._parse_mud_command(user_id, username, room, raw_command)
self._json_response(result)
elif self.path == '/bridge/quest/complete':
# POST /bridge/quest/complete — mark objective done
# Body: { user_id, quest_id, objective_index }
user_id = body.get('user_id')
quest_id = body.get('quest_id')
objective_index = body.get('objective_index')
if not user_id or not quest_id or objective_index is None:
self._json_response({"error": "user_id, quest_id, and objective_index required"}, 400)
return
result = quest_manager.complete_objective(quest_id, user_id, int(objective_index))
if "error" in result:
self._json_response(result, 400)
else:
self._json_response(result)
elif self.path == '/bridge/quest/assign':
# POST /bridge/quest/assign — assign a quest to a user
# Body: { user_id, quest_id }
user_id = body.get('user_id')
quest_id = body.get('quest_id')
if not user_id or not quest_id:
self._json_response({"error": "user_id and quest_id required"}, 400)
return
quest = quest_manager.assign(quest_id, user_id)
if quest:
self._json_response({"ok": True, "quest": quest.to_dict()})
else:
self._json_response({"error": "Quest not found"}, 404)
elif self.path == '/bridge/take':
# POST /bridge/take — pick up item from room
# Body: { user_id, username, room, item }
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
item_name = body.get('item', '')
if not item_name:
self._json_response({"error": "item required"}, 400)
return
result = inventory_manager.take_item(user_id, username, room, item_name)
if "error" in result:
self._json_response(result, 400)
else:
# Notify others in room
notification_manager.broadcast_room(
room, "item_taken",
f"{username} picked up {result['item']['name']}.",
exclude_user=user_id, data=result)
self._json_response(result)
elif self.path == '/bridge/drop':
# POST /bridge/drop — drop item in room
# Body: { user_id, username, room, item }
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
item_name = body.get('item', '')
if not item_name:
self._json_response({"error": "item required"}, 400)
return
result = inventory_manager.drop_item(user_id, username, room, item_name)
if "error" in result:
self._json_response(result, 400)
else:
# Notify others in room
notification_manager.broadcast_room(
room, "item_dropped",
f"{username} dropped {result['item']['name']}.",
exclude_user=user_id, data=result)
self._json_response(result)
elif self.path == '/bridge/combat/start':
# POST /bridge/combat/start — begin fight with NPC
# Body: { user_id, username, room, npc_id }
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
npc_id = body.get('npc_id', '')
if not npc_id:
self._json_response({"error": "npc_id required"}, 400)
return
result = combat_manager.start_fight(user_id, username, room, npc_id)
if "error" in result:
self._json_response(result, 400)
else:
npc = result.get("npc", {})
notification_manager.broadcast_room(
room, "combat_start",
f"{username} has engaged {npc.get('name', 'an enemy')} in combat!",
exclude_user=user_id, data=result)
self._json_response(result)
elif self.path == '/bridge/combat/attack':
# POST /bridge/combat/attack — attack NPC in active fight
# Body: { user_id, username, room }
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
result = combat_manager.attack(user_id)
if "error" in result:
self._json_response(result, 400)
else:
enc = result.get("encounter", {})
npc = result.get("npc", {})
loot = result.get("loot", [])
# Broadcast combat action to room
combat_msg = f"{username} strikes {npc.get('name', 'the enemy')} for {result.get('player_dmg', 0)} damage!"
notification_manager.broadcast_room(
room, "combat_action", combat_msg,
exclude_user=user_id, data=result)
# If NPC died, broadcast death
if not npc.get("alive", True):
death_msg = f"{npc.get('name', 'The enemy')} has been slain by {username}!"
if loot:
death_msg += f" Loot: {', '.join(i['name'] for i in loot)}"
notification_manager.broadcast_room(
room, "combat_death", death_msg, data=result)
# If player died
if enc.get("player_hp", 1) <= 0:
notification_manager.broadcast_room(
room, "combat_defeat",
f"{username} has been defeated in combat!", data=result)
self._json_response(result)
elif self.path == '/bridge/combat/defend':
# POST /bridge/combat/defend — defend (half damage next hit)
# Body: { user_id, username, room }
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
result = combat_manager.defend(user_id)
if "error" in result:
self._json_response(result, 400)
else:
notification_manager.broadcast_room(
room, "combat_action",
f"{username} braces defensively.",
exclude_user=user_id, data=result)
self._json_response(result)
elif self.path == '/bridge/combat/flee':
# POST /bridge/combat/flee — flee from combat
# Body: { user_id, username, room }
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
result = combat_manager.flee(user_id)
if "error" in result:
self._json_response(result, 400)
else:
notification_manager.broadcast_room(
room, "combat_flee",
f"{username} flees from combat!",
exclude_user=user_id, data=result)
self._json_response(result)
elif self.path == '/bridge/spell/cast':
# POST /bridge/spell/cast — cast a spell
# Body: { user_id, username, room, spell_id, target? }
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
room = body.get('room', 'The Threshold')
spell_id = body.get('spell_id', '')
target = body.get('target')
if not spell_id:
self._json_response({"error": "spell_id required"}, 400)
return
result = magic_manager.cast_spell(user_id, username, room, spell_id, target)
if "error" in result:
self._json_response(result, 400)
else:
# Broadcast spell casting to room
notification_manager.broadcast_room(
room, "spell_cast",
result.get("message", f"{username} casts a spell!"),
exclude_user=user_id, data=result)
self._json_response(result)
elif self.path == '/bridge/spell/learn':
# POST /bridge/spell/learn — learn a new spell
# Body: { user_id, username, spell_id }
user_id = body.get('user_id', 'anonymous')
username = body.get('username', 'Anonymous')
spell_id = body.get('spell_id', '')
if not spell_id:
self._json_response({"error": "spell_id required"}, 400)
return
result = magic_manager.learn_spell(user_id, username, spell_id)
if "error" in result:
self._json_response(result, 400)
else:
self._json_response(result)
else:
self._json_response({"error": "not found"}, 404)
def _parse_mud_command(self, user_id: str, username: str, room: str, command: str) -> dict:
"""Parse and execute a MUD-style command."""
parts = command.split(None, 1)
verb = parts[0].lower() if parts else ''
arg = parts[1].strip() if len(parts) > 1 else ''
state_file = WORLD_DIR / 'world_state.json'
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
rooms = world_state.get('rooms', {})
room_data = rooms.get(room, {})
if verb == 'look':
# Return room description and scene
desc = room_data.get('description_base', 'An empty room.')
desc_dynamic = room_data.get('description_dynamic', '')
objects = room_data.get('objects', [])
players = presence_manager.get_players_in_room(room)
scene = [desc]
if desc_dynamic:
scene.append(desc_dynamic)
if objects:
scene.append(f"You see: {', '.join(objects)}.")
# Show items on the ground
ground_items = inventory_manager.get_room_items(room)
if ground_items:
item_names = [it["name"] for it in ground_items]
scene.append(f"On the ground: {', '.join(item_names)}.")
if players:
names = [p['username'] for p in players if p['user_id'] != user_id]
if names:
scene.append(f"Also here: {', '.join(names)}.")
# Build exits from description_base
# Try world_state exits first, fall back to description parsing
exits = room_data.get('exits', {})
if not exits:
exits = self._extract_exits(room_data.get('description_base', ''))
if exits:
scene.append(f"Exits: {', '.join(exits.keys())}.")
return {
"command": "look",
"room": room,
"description": "\n".join(scene),
"objects": objects,
"exits": exits,
"players": players,
}
elif verb == 'go':
if not arg:
return {"command": "go", "error": "Go where? Specify a direction."}
direction = arg.lower()
# Try world_state exits first, fall back to description parsing
exits = room_data.get('exits', {})
if not exits:
exits = self._extract_exits(room_data.get('description_base', ''))
# Match direction
dest = None
for dir_name, dest_room in exits.items():
if dir_name.lower().startswith(direction) or direction in dir_name.lower():
dest = dest_room
break
if not dest:
return {"command": "go", "error": f"You can't go '{arg}' from here. Exits: {', '.join(exits.keys()) if exits else 'none'}."}
if dest not in rooms:
return {"command": "go", "error": f"Unknown destination: {dest}."}
# Move user — auto-notify
leave_event = presence_manager.leave_room(user_id, room)
if leave_event:
notification_manager.broadcast_room(
room, "player_leave",
f"{username} has left {room}.",
exclude_user=user_id, data=leave_event)
enter_event = presence_manager.enter_room(user_id, username, dest)
if enter_event:
notification_manager.broadcast_room(
dest, "player_join",
f"{username} has entered {dest}.",
exclude_user=user_id, data=enter_event)
# Update session room
with session_manager._lock:
if user_id in session_manager.sessions:
session_manager.sessions[user_id].room = dest
# Return new room look
new_room_data = rooms[dest]
desc = new_room_data.get('description_base', 'An empty room.')
desc_dynamic = new_room_data.get('description_dynamic', '')
objects = new_room_data.get('objects', [])
players = presence_manager.get_players_in_room(dest)
scene = [f"You go {arg}.\n"]
scene.append(desc)
if desc_dynamic:
scene.append(desc_dynamic)
if objects:
scene.append(f"You see: {', '.join(objects)}.")
new_exits = self._extract_exits(desc)
if new_exits:
scene.append(f"Exits: {', '.join(new_exits.keys())}.")
return {
"command": "go",
"direction": arg,
"room": dest,
"description": "\n".join(scene),
"exits": new_exits,
}
elif verb == 'examine':
if not arg:
return {"command": "examine", "error": "Examine what?"}
objects = room_data.get('objects', [])
# Fuzzy match object
target = arg.lower()
matched = None
for obj in objects:
if target in obj.lower() or obj.lower() in target:
matched = obj
break
if not matched:
return {"command": "examine", "error": f"You don't see '{arg}' here. Objects: {', '.join(objects) if objects else 'none'}."}
# Check whiteboard
if matched == 'whiteboard':
wb = room_data.get('whiteboard', [])
wb_text = "\n".join(f" - {w}" for w in wb) if wb else "(empty)"
return {"command": "examine", "object": matched, "description": f"The whiteboard reads:\n{wb_text}"}
# Default descriptions for known objects
descriptions = {
"stone floor": "Smooth stone, worn by countless footsteps.",
"doorframe": "An archway of fitted stone, humming faintly with energy.",
"server racks": "Wrought-iron racks filled with humming servers. Green LEDs blink in the dim light.",
"green LED": "A single green LED, pulsing steadily. Heartbeat.",
"cot": "A simple cot in the corner. Someone sleeps here sometimes.",
"anvil": "A heavy iron anvil, scarred and dented from years of use.",
"hammer": "A well-worn hammer, its handle smooth from use.",
"tongs": "Iron tongs, blackened by the forge.",
"hearth": "The hearth glows with banked coals. Warmth radiates outward.",
"tools": "Hammers, tongs, chisels — all well-used.",
"stone bench": "A cool stone bench under the oak tree. A good place to sit.",
"oak tree": "An old oak tree, its branches spreading wide over the garden.",
"herbs": "Lavender, rosemary, thyme — fragrant and green.",
"wildflowers": "Small flowers in purple, yellow, and white.",
"railing": "The bridge railing is worn smooth. Carvings mark the stone.",
"dark water": "Dark water flows beneath the bridge. You cannot see the bottom.",
}
desc = descriptions.get(matched, f"You look at the {matched}. It seems ordinary enough.")
return {"command": "examine", "object": matched, "description": desc}
elif verb == 'say':
if not arg:
return {"command": "say", "error": "Say what?"}
event = presence_manager.say(user_id, username, room, arg)
players = presence_manager.get_players_in_room(room)
return {
"command": "say",
"message": arg,
"room": room,
"event": event,
"recipients": players,
}
elif verb == 'ask':
if not arg:
return {"command": "ask", "error": "Ask what?"}
# Forward to chat endpoint logic
session = session_manager.get_or_create(user_id, username, room)
if not presence_manager.get_players_in_room(room) or \
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
presence_manager.enter_room(user_id, username, room)
response = session.chat(arg)
return {
"command": "ask",
"message": arg,
"room": room,
"response": response,
"session_messages": len(session.messages),
}
else:
return {
"command": verb,
"error": f"Unknown command: '{verb}'. Try: look, go <dir>, examine <obj>, say <msg>, ask <msg>",
}
def _extract_exits(self, description: str) -> dict:
"""Extract exits from room description like 'North to the Tower. East to the Garden.'"""
import re
exits = {}
# Match patterns like "North to the Tower", "East to the Garden"
pattern = r'(North|South|East|West|Up|Down|Northeast|Northwest|Southeast|Southwest)\s+to\s+(?:the\s+)?([A-Z][a-zA-Z\s]+?)(?:\.|$)'
for match in re.finditer(pattern, description):
direction = match.group(1)
destination = match.group(2).strip()
# Reconstruct full room name
room_name = destination if destination.startswith('The ') else f"The {destination}"
exits[direction] = room_name
return exits
def _json_response(self, data: dict, code: int = 200):
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def log_message(self, format, *args):
pass # Suppress HTTP logs
# ── World Tick System ──────────────────────────────────────────────────
import random
class WorldTickSystem:
"""Advances the world state every TICK_INTERVAL seconds.
On each tick:
- Increment tick counter
- Cycle time_of_day (dawn -> morning -> midday -> afternoon -> dusk -> night)
- Randomly change weather (clear, cloudy, foggy, light rain, heavy rain, storm)
- Evolve room states:
* The Forge: fire decays (blazing -> burning -> glowing -> embers -> cold)
* The Garden: growth advances (seeds -> sprouts -> growing -> blooming -> harvest)
* The Bridge: rain toggles
- Broadcast world events to connected players
- Save world state to disk
"""
TICK_INTERVAL = 60 # seconds
TIME_CYCLE = ["dawn", "morning", "midday", "afternoon", "dusk", "night"]
WEATHER_OPTIONS = [
"clear", "clear", "clear", # weighted toward clear
"cloudy", "cloudy",
"foggy",
"light rain",
"heavy rain",
"storm",
]
FIRE_STAGES = ["cold", "embers", "glowing", "burning", "blazing"]
GROWTH_STAGES = ["seeds", "sprouts", "growing", "blooming", "harvest"]
def __init__(self):
self._thread: threading.Thread | None = None
self._running = False
self._state_lock = threading.Lock()
# ── lifecycle ───────────────────────────────────────────────────────
def start(self):
"""Start the tick loop in a daemon thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True, name="world-tick")
self._thread.start()
print("[WorldTick] Started — ticking every {}s".format(self.TICK_INTERVAL))
def stop(self):
self._running = False
# ── internal loop ───────────────────────────────────────────────────
def _loop(self):
while self._running:
time.sleep(self.TICK_INTERVAL)
try:
self._tick()
except Exception as e:
print(f"[WorldTick] Error during tick: {e}")
def _tick(self):
"""Execute one world tick."""
state_file = WORLD_DIR / 'world_state.json'
if state_file.exists():
state = json.loads(state_file.read_text())
else:
state = {"tick": 0, "time_of_day": "morning", "weather": "clear", "rooms": {}}
events: list[str] = []
with self._state_lock:
# 1. Advance tick counter
state["tick"] = state.get("tick", 0) + 1
tick_num = state["tick"]
# 2. Time of day — advance every tick
current_time = state.get("time_of_day", "morning")
try:
idx = self.TIME_CYCLE.index(current_time)
except ValueError:
idx = 0
next_idx = (idx + 1) % len(self.TIME_CYCLE)
new_time = self.TIME_CYCLE[next_idx]
state["time_of_day"] = new_time
if new_time == "dawn":
events.append("The sun rises over The Tower. A new day begins.")
elif new_time == "dusk":
events.append("The sky darkens. Dusk settles over the world.")
elif new_time == "night":
events.append("Night falls. The green LED in The Tower pulses in the dark.")
# 3. Weather — change every 3 ticks
if tick_num % 3 == 0:
old_weather = state.get("weather", "clear")
new_weather = random.choice(self.WEATHER_OPTIONS)
state["weather"] = new_weather
if new_weather != old_weather:
events.append(f"The weather shifts to {new_weather}.")
# 4. Room states
rooms = state.get("rooms", {})
# The Forge — fire decays one stage every 2 ticks unless rekindled
forge = rooms.get("The Forge", {})
if forge:
fire = forge.get("fire_state", "cold")
untouched = forge.get("fire_untouched_ticks", 0) + 1
forge["fire_untouched_ticks"] = untouched
if untouched >= 2 and fire != "cold":
try:
fi = self.FIRE_STAGES.index(fire)
except ValueError:
fi = 0
if fi > 0:
forge["fire_state"] = self.FIRE_STAGES[fi - 1]
forge["fire_untouched_ticks"] = 0
if forge["fire_state"] == "embers":
events.append("The forge fire has died down to embers.")
elif forge["fire_state"] == "cold":
events.append("The forge fire has gone cold.")
rooms["The Forge"] = forge
# The Garden — growth advances one stage every 4 ticks
garden = rooms.get("The Garden", {})
if garden:
if tick_num % 4 == 0:
growth = garden.get("growth_stage", "seeds")
try:
gi = self.GROWTH_STAGES.index(growth)
except ValueError:
gi = 0
if gi < len(self.GROWTH_STAGES) - 1:
garden["growth_stage"] = self.GROWTH_STAGES[gi + 1]
events.append(f"The garden has advanced to {garden['growth_stage']}.")
else:
# Reset cycle
garden["growth_stage"] = "seeds"
events.append("The garden has been harvested. New seeds are planted.")
rooms["The Garden"] = garden
# The Bridge — rain state follows weather
bridge = rooms.get("The Bridge", {})
if bridge:
weather = state.get("weather", "clear")
is_raining = weather in ("light rain", "heavy rain", "storm")
was_raining = bridge.get("rain_active", False)
bridge["rain_active"] = is_raining
if is_raining and not was_raining:
events.append("Rain begins to fall on The Bridge.")
elif not is_raining and was_raining:
events.append("The rain on The Bridge lets up.")
rooms["The Bridge"] = bridge
state["rooms"] = rooms
state["last_updated"] = datetime.now().isoformat()
# 5. Mana regeneration
magic_manager.regenerate_mana()
# 6. Save world state
state_file.write_text(json.dumps(state, indent=2))
# 6. Broadcast events to all occupied rooms (outside lock)
if events:
event_text = " ".join(events)
for room_name in list(presence_manager._rooms.keys()):
players = presence_manager.get_players_in_room(room_name)
if players:
notification_manager.broadcast_room(
room_name, "world_tick", event_text,
data={"tick": state["tick"], "time_of_day": state["time_of_day"],
"weather": state.get("weather"), "events": events},
)
print(f"[WorldTick] tick={state['tick']} time={state['time_of_day']} "
f"weather={state.get('weather')} events={len(events)}")
world_tick_system = WorldTickSystem()
# ── Main ───────────────────────────────────────────────────────────────
def main():
print(f"Multi-User AI Bridge starting on {BRIDGE_HOST}:{BRIDGE_PORT}")
print(f"World dir: {WORLD_DIR}")
print(f"Max sessions: {session_manager.max_sessions}")
print()
print("Endpoints:")
print(f" GET /bridge/health — Health check")
print(f" GET /bridge/sessions — List active sessions")
print(f" GET /bridge/world-state — Full world state (rooms, players, convos)")
print(f" GET /bridge/stats — Aggregate stats (sessions, messages, uptime)")
print(f" GET /bridge/room/<room>/players — List players in a room")
print(f" GET /bridge/room/<room>/events — Room events (presence + chat)")
print(f" GET /bridge/session/<id>/summary — Get conversation summary")
print(f" POST /bridge/chat — Send message to Timmy")
print(f" POST /bridge/say — Say something to room (visible to all)")
print(f" POST /bridge/move — Move user to room (triggers presence)")
print(f" POST /bridge/command — MUD command parser (look, go, examine, say, ask)")
print()
# Start world tick system
world_tick_system.start()
server = ThreadingHTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
server.serve_forever()
if __name__ == '__main__':
main()