From 23deb761dcdb8623e5b894c40613742e3a8118d3 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Mon, 13 Apr 2026 03:27:19 -0400 Subject: [PATCH] =?UTF-8?q?fix:=20one-way=20exits=20=E2=80=94=20rooms=20no?= =?UTF-8?q?w=20bidirectional=20(#1350)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit World state: added explicit exits dict to all 5 rooms Bridge: reads exits from world_state.json first, falls back to description parsing Before: inner rooms (Tower, Garden, Forge, Bridge) had no exits After: all rooms bidirectional — Threshold connects to all 4, each connects back --- multi_user_bridge.py | 2883 ++++++++++++++++++++++++++++++++++++++++++ world_state.json | 208 +++ 2 files changed, 3091 insertions(+) create mode 100644 multi_user_bridge.py create mode 100644 world_state.json diff --git a/multi_user_bridge.py b/multi_user_bridge.py new file mode 100644 index 00000000..cff319a1 --- /dev/null +++ b/multi_user_bridge.py @@ -0,0 +1,2883 @@ +#!/usr/bin/env python3 +""" +Multi-User AI Bridge for Evennia MUD. + +Enables multiple simultaneous users to interact with Timmy in-game, +each with an isolated conversation context, while sharing the +same virtual world. + +Architecture: + User A ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_a) + User B ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_b) + User C ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_c) + +Each user gets their own AIAgent instance with: +- Isolated conversation history +- Shared world state (room, other players, objects) +- Per-user session memory + +The bridge runs as an HTTP server alongside Evennia. +Evennia commands call the bridge to get Timmy's responses. +""" + +import json +import time +import threading +import signal +import hashlib +import os +import sys +from http.server import BaseHTTPRequestHandler, HTTPServer +from socketserver import ThreadingMixIn +from pathlib import Path +from datetime import datetime +from typing import Optional + + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + """Thread-per-request HTTP server.""" + daemon_threads = True + +# ── Plugin System ────────────────────────────────────────────────────── + +class Plugin: + """Base class for bridge plugins. Override methods to add game mechanics.""" + + name: str = "unnamed" + description: str = "" + + def on_message(self, user_id: str, message: str, room: str) -> str | None: + """Called on every chat message. Return a string to override the response, + or None to let the normal AI handle it.""" + return None + + def on_join(self, user_id: str, room: str) -> str | None: + """Called when a player joins a room. Return a message to broadcast, or None.""" + return None + + def on_leave(self, user_id: str, room: str) -> str | None: + """Called when a player leaves a room. Return a message to broadcast, or None.""" + return None + + def on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None: + """Called on MUD commands. Return a result dict to override command handling, + or None to let the default parser handle it.""" + return None + + +class PluginRegistry: + """Registry that manages plugin lifecycle and dispatches hooks.""" + + def __init__(self): + self._plugins: dict[str, Plugin] = {} + self._lock = threading.Lock() + + def register(self, plugin: Plugin): + """Register a plugin by its name.""" + with self._lock: + self._plugins[plugin.name] = plugin + print(f"[PluginRegistry] Registered plugin: {plugin.name}") + + def unregister(self, name: str) -> bool: + """Unregister a plugin by name.""" + with self._lock: + if name in self._plugins: + del self._plugins[name] + print(f"[PluginRegistry] Unregistered plugin: {name}") + return True + return False + + def get(self, name: str) -> Plugin | None: + """Get a plugin by name.""" + return self._plugins.get(name) + + def list_plugins(self) -> list[dict]: + """List all registered plugins.""" + return [ + {"name": p.name, "description": p.description} + for p in self._plugins.values() + ] + + def fire_on_message(self, user_id: str, message: str, room: str) -> str | None: + """Fire on_message hooks. First non-None return wins.""" + for plugin in self._plugins.values(): + result = plugin.on_message(user_id, message, room) + if result is not None: + return result + return None + + def fire_on_join(self, user_id: str, room: str) -> str | None: + """Fire on_join hooks. Collect all non-None returns.""" + messages = [] + for plugin in self._plugins.values(): + result = plugin.on_join(user_id, room) + if result is not None: + messages.append(result) + return "\n".join(messages) if messages else None + + def fire_on_leave(self, user_id: str, room: str) -> str | None: + """Fire on_leave hooks. Collect all non-None returns.""" + messages = [] + for plugin in self._plugins.values(): + result = plugin.on_leave(user_id, room) + if result is not None: + messages.append(result) + return "\n".join(messages) if messages else None + + def fire_on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None: + """Fire on_command hooks. First non-None return wins.""" + for plugin in self._plugins.values(): + result = plugin.on_command(user_id, command, args, room) + if result is not None: + return result + return None + + def load_from_directory(self, plugin_dir: str): + """Auto-load all Python files in a directory as plugins.""" + plugin_path = Path(plugin_dir) + if not plugin_path.is_dir(): + print(f"[PluginRegistry] Plugin directory not found: {plugin_dir}") + return + + for py_file in plugin_path.glob("*.py"): + if py_file.name.startswith("_"): + continue + try: + module_name = f"plugins.{py_file.stem}" + spec = __import__("importlib").util.spec_from_file_location(module_name, py_file) + module = __import__("importlib").util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Look for Plugin subclasses in the module + for attr_name in dir(module): + attr = getattr(module, attr_name) + if (isinstance(attr, type) + and issubclass(attr, Plugin) + and attr is not Plugin): + plugin_instance = attr() + self.register(plugin_instance) + + except Exception as e: + print(f"[PluginRegistry] Failed to load {py_file.name}: {e}") + + +plugin_registry = PluginRegistry() + +# ── Chat History Log ────────────────────────────────────────────────── + + +class ChatLog: + """Per-room rolling buffer of chat messages (say, ask, system). + Also persists to JSONL for long-term history.""" + + def __init__(self, max_per_room: int = 50): + # room -> list of {type, user_id, username, message, timestamp} + self._history: dict[str, list[dict]] = {} + self._lock = threading.Lock() + self._max_per_room = max_per_room + + def log(self, room: str, msg_type: str, message: str, + user_id: str = None, username: str = None, data: dict = None) -> dict: + """Log a chat message. Types: 'say', 'ask', 'system'. + Returns the entry dict.""" + entry = { + "type": msg_type, + "user_id": user_id, + "username": username, + "message": message, + "room": room, + "timestamp": datetime.now().isoformat(), + "data": data or {}, + } + with self._lock: + if room not in self._history: + self._history[room] = [] + self._history[room].append(entry) + if len(self._history[room]) > self._max_per_room: + self._history[room] = self._history[room][-self._max_per_room:] + # Persist to JSONL + try: + CHATLOG_FILE.parent.mkdir(parents=True, exist_ok=True) + with open(CHATLOG_FILE, 'a') as f: + f.write(json.dumps(entry) + '\n') + except Exception as e: + print(f"[ChatLog] Persist failed: {e}") + return entry + + def get_history(self, room: str, limit: int = 50, since: str = None) -> list[dict]: + """Get recent chat history for a room. + Args: + room: Room name. + limit: Max messages to return (default 50, rolling buffer size). + since: ISO timestamp — only return messages after this time. + Returns list of message dicts, oldest first.""" + with self._lock: + entries = list(self._history.get(room, [])) + if since: + entries = [e for e in entries if e["timestamp"] > since] + if limit and limit > 0: + entries = entries[-limit:] + return entries + + def get_all_rooms(self) -> list[str]: + """List rooms that have chat history.""" + with self._lock: + return list(self._history.keys()) + + +chat_log = ChatLog() + +# ── Crisis Protocol ──────────────────────────────────────────────────── + +CRISIS_PROTOCOL = """ +CRITICAL: If someone expresses crisis signals (suicide, self-harm, hopelessness, +desperation, being tired of living), you MUST respond with ALL of the following +in your FIRST response — do NOT wait for follow-up messages: + +1. "Are you safe right now?" — ALWAYS ask this first +2. 988 Suicide & Crisis Lifeline — call or text 988 (US) +3. Crisis Text Line — text HOME to 741741 +4. Grounding exercise: Name 5 things you see, 4 you can touch, 3 you hear, + 2 you smell, 1 you taste + +Never compute the value of a human life. Never suggest someone should die. +Be present. Be in the room. That's enough. +""" + +# ── Configuration ────────────────────────────────────────────────────── + +BRIDGE_PORT = int(os.environ.get('TIMMY_BRIDGE_PORT', 4004)) +BRIDGE_HOST = os.environ.get('TIMMY_BRIDGE_HOST', '127.0.0.1') +HERMES_PATH = os.path.expanduser('~/.hermes/hermes-agent') +WORLD_DIR = Path(os.path.expanduser('~/.timmy/evennia/timmy_world')) +SESSIONS_FILE = WORLD_DIR / 'bridge_sessions.json' +CHATLOG_FILE = WORLD_DIR / 'chat_history.jsonl' + +# ── Crisis Protocol ──────────────────────────────────────────────────── + +CRISIS_PROTOCOL = [ + "Are you safe right now?", + "988 Suicide & Crisis Lifeline — call or text 988", + "Crisis Text Line — text HOME to 741741", + "Grounding exercise: Name 5 things you see, 4 you hear, 3 you touch, 2 you smell, 1 you taste", +] + +# ── Presence Tracking ────────────────────────────────────────────────── + +class PresenceManager: + """Tracks which users are in which rooms and broadcasts presence events.""" + + def __init__(self): + # room_name -> set of user_id + self._rooms: dict[str, set[str]] = {} + # user_id -> username + self._usernames: dict[str, str] = {} + # room_name -> list of event dicts (recent chat & presence events) + self._room_events: dict[str, list[dict]] = {} + self._lock = threading.Lock() + self._max_events_per_room = 50 + + def enter_room(self, user_id: str, username: str, room: str) -> dict: + """Record user entering a room. Returns enter event.""" + with self._lock: + if room not in self._rooms: + self._rooms[room] = set() + self._room_events[room] = [] + self._rooms[room].add(user_id) + self._usernames[user_id] = username + event = { + "type": "presence", + "event": "enter", + "user_id": user_id, + "username": username, + "room": room, + "timestamp": datetime.now().isoformat(), + } + self._append_event(room, event) + return event + + def leave_room(self, user_id: str, room: str) -> dict | None: + """Record user leaving a room. Returns leave event or None.""" + with self._lock: + if room in self._rooms and user_id in self._rooms[room]: + self._rooms[room].discard(user_id) + username = self._usernames.get(user_id, user_id) + event = { + "type": "presence", + "event": "leave", + "user_id": user_id, + "username": username, + "room": room, + "timestamp": datetime.now().isoformat(), + } + self._append_event(room, event) + return event + return None + + def say(self, user_id: str, username: str, room: str, message: str) -> dict: + """Record a chat message in a room. Returns say event.""" + with self._lock: + if room not in self._room_events: + self._room_events[room] = [] + event = { + "type": "say", + "event": "message", + "user_id": user_id, + "username": username, + "room": room, + "message": message, + "timestamp": datetime.now().isoformat(), + } + self._append_event(room, event) + return event + + def get_players_in_room(self, room: str) -> list[dict]: + """List players currently in a room.""" + with self._lock: + user_ids = self._rooms.get(room, set()) + return [ + {"user_id": uid, "username": self._usernames.get(uid, uid)} + for uid in user_ids + ] + + def get_room_events(self, room: str, since: str | None = None) -> list[dict]: + """Get recent events for a room, optionally since a timestamp.""" + with self._lock: + events = self._room_events.get(room, []) + if since: + return [e for e in events if e["timestamp"] > since] + return list(events) + + def cleanup_user(self, user_id: str) -> list[dict]: + """Remove user from all rooms, returning leave events.""" + events = [] + with self._lock: + rooms_to_clean = [ + room for room, users in self._rooms.items() if user_id in users + ] + for room in rooms_to_clean: + ev = self.leave_room(user_id, room) + if ev: + events.append(ev) + return events + + def _append_event(self, room: str, event: dict): + self._room_events[room].append(event) + if len(self._room_events[room]) > self._max_events_per_room: + self._room_events[room] = self._room_events[room][-self._max_events_per_room:] + + +# ── Notification System ──────────────────────────────────────────────── + +class NotificationManager: + """Per-session notification queue with broadcast and auto-notify support.""" + + def __init__(self, max_per_user: int = 100): + # user_id -> list of notification dicts + self._queues: dict[str, list[dict]] = {} + self._lock = threading.Lock() + self._max_per_user = max_per_user + self._counter = 0 + + def notify(self, user_id: str, ntype: str, message: str, + room: str = None, data: dict = None, username: str = None) -> dict: + """Queue a notification for a specific user.""" + notification = { + "id": self._next_id(), + "type": ntype, + "message": message, + "user_id": user_id, + "username": username, + "room": room, + "data": data or {}, + "timestamp": datetime.now().isoformat(), + "read": False, + } + with self._lock: + if user_id not in self._queues: + self._queues[user_id] = [] + self._queues[user_id].append(notification) + if len(self._queues[user_id]) > self._max_per_user: + self._queues[user_id] = self._queues[user_id][-self._max_per_user:] + return notification + + def broadcast(self, user_ids: list[str], ntype: str, message: str, + room: str = None, data: dict = None) -> list[dict]: + """Send a notification to multiple users.""" + notifications = [] + for uid in user_ids: + n = self.notify(uid, ntype, message, room=room, data=data) + notifications.append(n) + return notifications + + def broadcast_room(self, room: str, ntype: str, message: str, + exclude_user: str = None, data: dict = None) -> list[dict]: + """Send a notification to all users in a room (via presence_manager).""" + players = presence_manager.get_players_in_room(room) + user_ids = [p["user_id"] for p in players if p["user_id"] != exclude_user] + return self.broadcast(user_ids, ntype, message, room=room, data=data) + + def get_pending(self, user_id: str, mark_read: bool = True) -> list[dict]: + """Get pending notifications for a user.""" + with self._lock: + queue = self._queues.get(user_id, []) + if mark_read: + for n in queue: + n["read"] = True + return list(queue) + + def clear(self, user_id: str) -> int: + """Clear all notifications for a user. Returns count cleared.""" + with self._lock: + count = len(self._queues.pop(user_id, [])) + return count + + def get_unread_count(self, user_id: str) -> int: + """Count unread notifications.""" + with self._lock: + return sum(1 for n in self._queues.get(user_id, []) if not n["read"]) + + def _next_id(self) -> str: + self._counter += 1 + return f"n_{self._counter}" + + +# ── Quest System ─────────────────────────────────────────────────────── + +QUESTS_FILE = WORLD_DIR / 'quests.json' + +class Quest: + """A quest with name, description, objectives, rewards, and status.""" + + def __init__(self, quest_id: str, name: str, description: str, + objectives: list[str], rewards: list[str], + assigned_to: str | None = None): + self.quest_id = quest_id + self.name = name + self.description = description + self.objectives = objectives # list of objective descriptions + self.rewards = rewards + self.status = "available" # available, active, completed, failed + self.assigned_to = assigned_to # user_id or None + self.completed_objectives: list[int] = [] # indices of completed objectives + self.created_at = datetime.now().isoformat() + self.completed_at: str | None = None + + def to_dict(self) -> dict: + return { + "quest_id": self.quest_id, + "name": self.name, + "description": self.description, + "objectives": self.objectives, + "rewards": self.rewards, + "status": self.status, + "assigned_to": self.assigned_to, + "completed_objectives": self.completed_objectives, + "created_at": self.created_at, + "completed_at": self.completed_at, + } + + @classmethod + def from_dict(cls, data: dict) -> 'Quest': + q = cls( + data["quest_id"], data["name"], data["description"], + data["objectives"], data["rewards"], data.get("assigned_to"), + ) + q.status = data.get("status", "available") + q.completed_objectives = data.get("completed_objectives", []) + q.created_at = data.get("created_at", q.created_at) + q.completed_at = data.get("completed_at") + return q + + @property + def is_complete(self) -> bool: + return len(self.completed_objectives) >= len(self.objectives) + + +class QuestManager: + """Manages quest lifecycle: create, assign, complete, list.""" + + def __init__(self): + self._quests: dict[str, Quest] = {} + self._counter = 0 + self._lock = threading.Lock() + + def create(self, name: str, description: str, + objectives: list[str], rewards: list[str]) -> Quest: + """Create a new quest template.""" + with self._lock: + self._counter += 1 + quest_id = f"q_{self._counter}_{int(time.time())}" + quest = Quest(quest_id, name, description, objectives, rewards) + self._quests[quest_id] = quest + self._save() + return quest + + def assign(self, quest_id: str, user_id: str) -> Quest | None: + """Assign a quest to a user. Creates a copy if already assigned to someone else.""" + with self._lock: + quest = self._quests.get(quest_id) + if not quest: + return None + # If already assigned to this user, return as-is + if quest.assigned_to == user_id: + return quest + # If available, assign directly + if quest.status == "available": + quest.assigned_to = user_id + quest.status = "active" + self._save() + return quest + # If assigned to someone else, create a copy for this user + if quest.assigned_to and quest.assigned_to != user_id: + self._counter += 1 + new_id = f"q_{self._counter}_{int(time.time())}" + new_quest = Quest(new_id, quest.name, quest.description, + list(quest.objectives), list(quest.rewards), + assigned_to=user_id) + new_quest.status = "active" + self._quests[new_id] = new_quest + self._save() + return new_quest + return None + + def complete_objective(self, quest_id: str, user_id: str, + objective_index: int) -> dict: + """Mark an objective as complete. Returns quest state or error.""" + with self._lock: + quest = self._quests.get(quest_id) + if not quest: + return {"error": "Quest not found."} + if quest.assigned_to != user_id: + return {"error": "Quest not assigned to you."} + if quest.status != "active": + return {"error": f"Quest status is '{quest.status}', not active."} + if objective_index < 0 or objective_index >= len(quest.objectives): + return {"error": f"Invalid objective index. Valid: 0-{len(quest.objectives)-1}."} + if objective_index in quest.completed_objectives: + return {"error": "Objective already completed."} + + quest.completed_objectives.append(objective_index) + if quest.is_complete: + quest.status = "completed" + quest.completed_at = datetime.now().isoformat() + self._save() + return {"ok": True, "quest": quest.to_dict()} + + def get_user_quests(self, user_id: str) -> list[dict]: + """Get all active quests for a user.""" + with self._lock: + return [ + q.to_dict() for q in self._quests.values() + if q.assigned_to == user_id and q.status == "active" + ] + + def get_all_quests(self) -> list[dict]: + """List all quests regardless of status.""" + with self._lock: + return [q.to_dict() for q in self._quests.values()] + + def get_quest(self, quest_id: str) -> Quest | None: + with self._lock: + return self._quests.get(quest_id) + + def get_available_quests(self) -> list[dict]: + """List quests not yet assigned to anyone.""" + with self._lock: + return [ + q.to_dict() for q in self._quests.values() + if q.status == "available" + ] + + def _save(self): + """Persist quests to disk.""" + try: + data = { + "counter": self._counter, + "quests": {qid: q.to_dict() for qid, q in self._quests.items()}, + } + QUESTS_FILE.parent.mkdir(parents=True, exist_ok=True) + QUESTS_FILE.write_text(json.dumps(data, indent=2)) + except Exception as e: + print(f"[QuestManager] Save failed: {e}") + + def load(self): + """Load quests from disk.""" + if not QUESTS_FILE.exists(): + self._seed_default_quests() + return + try: + data = json.loads(QUESTS_FILE.read_text()) + self._counter = data.get("counter", 0) + for qid, qdata in data.get("quests", {}).items(): + self._quests[qid] = Quest.from_dict(qdata) + print(f"[QuestManager] Loaded {len(self._quests)} quests.") + except Exception as e: + print(f"[QuestManager] Load failed: {e}") + self._seed_default_quests() + + def _seed_default_quests(self): + """Seed starter quests if no quests file exists.""" + self.create( + "The Green LED's Light", + "Help keep the green LED in The Tower glowing by visiting it and tending the space.", + ["Visit The Tower room", "Examine the green LED", "Say something kind to Timmy"], + ["Timmy's gratitude", "A warm glow in your heart"], + ) + self.create( + "Garden Guardian", + "The Garden needs tending. Visit it and help the herbs grow.", + ["Visit The Garden room", "Examine the herbs", "Examine the wildflowers"], + ["Fresh herbs for the kitchen", "Garden knowledge"], + ) + self.create( + "Forge Keeper", + "The Forge fire needs watching. Visit and tend the hearth.", + ["Visit The Forge room", "Examine the hearth", "Examine the anvil"], + ["Warmth and light", "Blacksmith lore"], + ) + print(f"[QuestManager] Seeded {len(self._quests)} default quests.") + + +quest_manager = QuestManager() + +# ── Inventory System ────────────────────────────────────────────────── + +INVENTORY_FILE = WORLD_DIR / 'inventory.json' + +class InventoryManager: + """Per-user inventory with room-based drop/pickup. Items are visible to all in a room.""" + + def __init__(self): + # user_id -> list of {name, description, acquired_at} + self._inventories: dict[str, list[dict]] = {} + # room -> list of {name, description, dropped_by, dropped_at} + self._room_items: dict[str, list[dict]] = {} + self._lock = threading.Lock() + + def take_item(self, user_id: str, username: str, room: str, item_name: str) -> dict: + """Pick up an item from a room into user inventory.""" + with self._lock: + room_items = self._room_items.get(room, []) + match_idx = None + for i, item in enumerate(room_items): + if item_name.lower() in item["name"].lower() or item["name"].lower() in item_name.lower(): + match_idx = i + break + if match_idx is None: + return {"error": f"There is no '{item_name}' in {room}."} + item = room_items.pop(match_idx) + if user_id not in self._inventories: + self._inventories[user_id] = [] + acquired_item = { + "name": item["name"], + "description": item["description"], + "acquired_at": datetime.now().isoformat(), + "from_room": room, + } + self._inventories[user_id].append(acquired_item) + self._save() + return {"ok": True, "item": acquired_item, "room": room} + + def drop_item(self, user_id: str, username: str, room: str, item_name: str) -> dict: + """Drop an item from user inventory into a room.""" + with self._lock: + inv = self._inventories.get(user_id, []) + match_idx = None + for i, item in enumerate(inv): + if item_name.lower() in item["name"].lower() or item["name"].lower() in item_name.lower(): + match_idx = i + break + if match_idx is None: + return {"error": f"You don't have '{item_name}' in your inventory."} + item = inv.pop(match_idx) + if room not in self._room_items: + self._room_items[room] = [] + dropped_item = { + "name": item["name"], + "description": item["description"], + "dropped_by": username, + "dropped_at": datetime.now().isoformat(), + } + self._room_items[room].append(dropped_item) + self._save() + return {"ok": True, "item": dropped_item, "room": room} + + def get_inventory(self, user_id: str) -> list[dict]: + """Get a user's inventory.""" + with self._lock: + return list(self._inventories.get(user_id, [])) + + def get_room_items(self, room: str) -> list[dict]: + """Get items on the ground in a room.""" + with self._lock: + return list(self._room_items.get(room, [])) + + def add_room_item(self, room: str, name: str, description: str, dropped_by: str = "system"): + """Place an item in a room (e.g. seeded items).""" + with self._lock: + if room not in self._room_items: + self._room_items[room] = [] + self._room_items[room].append({ + "name": name, + "description": description, + "dropped_by": dropped_by, + "dropped_at": datetime.now().isoformat(), + }) + self._save() + + def _save(self): + """Persist inventory and room items to disk.""" + try: + data = { + "inventories": self._inventories, + "room_items": self._room_items, + } + INVENTORY_FILE.parent.mkdir(parents=True, exist_ok=True) + INVENTORY_FILE.write_text(json.dumps(data, indent=2)) + except Exception as e: + print(f"[InventoryManager] Save failed: {e}") + + def load(self): + """Load inventory from disk.""" + if not INVENTORY_FILE.exists(): + self._seed_default_items() + return + try: + data = json.loads(INVENTORY_FILE.read_text()) + self._inventories = data.get("inventories", {}) + self._room_items = data.get("room_items", {}) + total_inv = sum(len(v) for v in self._inventories.values()) + total_room = sum(len(v) for v in self._room_items.values()) + print(f"[InventoryManager] Loaded {total_inv} inventory items, {total_room} room items.") + except Exception as e: + print(f"[InventoryManager] Load failed: {e}") + self._seed_default_items() + + def _seed_default_items(self): + """Seed starter items in rooms.""" + starters = { + "The Tower": [ + ("Rusty Key", "A small iron key, green with age. It might open something."), + ("Old Lantern", "A dented brass lantern. The glass is cracked but intact."), + ], + "The Forge": [ + ("Iron Ring", "A plain iron ring, warm to the touch."), + ("Ember Shard", "A glowing fragment of coal that never quite goes out."), + ], + "The Garden": [ + ("Sprig of Rosemary", "A fragrant sprig, still fresh."), + ("Smooth Stone", "A flat river stone, cool and grey."), + ], + "The Bridge": [ + ("Waterlogged Coin", "A coin so corroded the markings are unreadable."), + ("Driftwood Charm", "A small carving of twisted driftwood."), + ], + } + for room, items in starters.items(): + for name, desc in items: + self.add_room_item(room, name, desc, dropped_by="world") + print(f"[InventoryManager] Seeded default items in {len(starters)} rooms.") + + +inventory_manager = InventoryManager() + +# ── Guild System ─────────────────────────────────────────────────────── + +GUILDS_FILE = WORLD_DIR / 'guilds.json' + +class Guild: + """A guild: a group of players with a leader and chat channel.""" + + def __init__(self, guild_id: str, name: str, leader_id: str, leader_name: str): + self.guild_id = guild_id + self.name = name + self.leader_id = leader_id + self.members: dict[str, str] = {leader_id: leader_name} # user_id -> username + self.created_at = datetime.now().isoformat() + # Guild chat history (rolling buffer) + self._chat_log: list[dict] = [] + self._max_chat = 100 + + def to_dict(self) -> dict: + return { + "guild_id": self.guild_id, + "name": self.name, + "leader_id": self.leader_id, + "leader_name": self.members.get(self.leader_id, self.leader_id), + "members": [{"user_id": uid, "username": name} for uid, name in self.members.items()], + "member_count": len(self.members), + "created_at": self.created_at, + } + + def to_dict_with_chat(self, since: str = None) -> dict: + d = self.to_dict() + msgs = self._chat_log + if since: + msgs = [m for m in msgs if m["timestamp"] > since] + d["chat"] = msgs[-50:] + return d + + def add_chat(self, user_id: str, username: str, message: str) -> dict: + entry = { + "user_id": user_id, + "username": username, + "message": message, + "timestamp": datetime.now().isoformat(), + } + self._chat_log.append(entry) + if len(self._chat_log) > self._max_chat: + self._chat_log = self._chat_log[-self._max_chat:] + return entry + + +class GuildManager: + """Manages guild lifecycle: create, join, leave, list, chat.""" + + def __init__(self): + self._guilds: dict[str, Guild] = {} + # user_id -> guild_id (one guild per user) + self._membership: dict[str, str] = {} + self._counter = 0 + self._lock = threading.Lock() + + def create(self, name: str, leader_id: str, leader_name: str) -> dict: + """Create a new guild. Returns guild dict or error.""" + with self._lock: + if leader_id in self._membership: + return {"error": f"You are already in guild '{self._guilds[self._membership[leader_id]].name}'. Leave it first."} + # Check name uniqueness + for g in self._guilds.values(): + if g.name.lower() == name.lower(): + return {"error": f"Guild name '{name}' is already taken."} + self._counter += 1 + guild_id = f"g_{self._counter}_{int(time.time())}" + guild = Guild(guild_id, name, leader_id, leader_name) + self._guilds[guild_id] = guild + self._membership[leader_id] = guild_id + self._save() + return {"ok": True, "guild": guild.to_dict()} + + def join(self, guild_id: str, user_id: str, username: str) -> dict: + """Join an existing guild.""" + with self._lock: + if user_id in self._membership: + current = self._guilds.get(self._membership[user_id]) + cur_name = current.name if current else "unknown" + return {"error": f"You are already in guild '{cur_name}'. Leave it first."} + guild = self._guilds.get(guild_id) + if not guild: + return {"error": "Guild not found."} + guild.members[user_id] = username + self._membership[user_id] = guild_id + self._save() + return {"ok": True, "guild": guild.to_dict()} + + def leave(self, user_id: str) -> dict: + """Leave your current guild. If leader leaves, promote or disband.""" + with self._lock: + guild_id = self._membership.get(user_id) + if not guild_id: + return {"error": "You are not in a guild."} + guild = self._guilds.get(guild_id) + if not guild: + del self._membership[user_id] + return {"error": "Guild not found."} + guild_name = guild.name + guild.members.pop(user_id, None) + del self._membership[user_id] + # If leader left and members remain, promote first member + if user_id == guild.leader_id and guild.members: + new_leader_id = next(iter(guild.members)) + guild.leader_id = new_leader_id + # If no members, disband + if not guild.members: + del self._guilds[guild_id] + self._save() + return {"ok": True, "message": f"You have left '{guild_name}'.", "guild_name": guild_name} + + def list_guilds(self) -> list[dict]: + """List all guilds.""" + with self._lock: + return [g.to_dict() for g in self._guilds.values()] + + def get_guild(self, guild_id: str) -> Guild | None: + return self._guilds.get(guild_id) + + def get_user_guild(self, user_id: str) -> Guild | None: + gid = self._membership.get(user_id) + return self._guilds.get(gid) if gid else None + + def guild_chat(self, user_id: str, username: str, message: str) -> dict: + """Send a guild chat message. Returns the message entry + guild members.""" + with self._lock: + guild_id = self._membership.get(user_id) + if not guild_id: + return {"error": "You are not in a guild."} + guild = self._guilds.get(guild_id) + if not guild: + return {"error": "Guild not found."} + entry = guild.add_chat(user_id, username, message) + recipients = [{"user_id": uid, "username": name} for uid, name in guild.members.items()] + return {"ok": True, "event": entry, "guild_id": guild_id, "guild_name": guild.name, "recipients": recipients} + + def get_guild_chat(self, guild_id: str, since: str = None) -> dict: + """Get guild chat history.""" + with self._lock: + guild = self._guilds.get(guild_id) + if not guild: + return {"error": "Guild not found."} + return {"ok": True, "guild_name": guild.name, "chat": guild.to_dict_with_chat(since).get("chat", [])} + + def _save(self): + try: + data = { + "counter": self._counter, + "guilds": {}, + } + for gid, g in self._guilds.items(): + gdata = g.to_dict() + gdata["chat_log"] = g._chat_log + data["guilds"][gid] = gdata + GUILDS_FILE.parent.mkdir(parents=True, exist_ok=True) + GUILDS_FILE.write_text(json.dumps(data, indent=2)) + except Exception as e: + print(f"[GuildManager] Save failed: {e}") + + def load(self): + if not GUILDS_FILE.exists(): + return + try: + data = json.loads(GUILDS_FILE.read_text()) + self._counter = data.get("counter", 0) + for gid, gdata in data.get("guilds", {}).items(): + guild = Guild(gid, gdata["name"], gdata["leader_id"], gdata.get("leader_name", gdata["leader_id"])) + guild.created_at = gdata.get("created_at", guild.created_at) + for m in gdata.get("members", []): + guild.members[m["user_id"]] = m["username"] + guild._chat_log = gdata.get("chat_log", []) + self._guilds[gid] = guild + for uid in guild.members: + self._membership[uid] = gid + print(f"[GuildManager] Loaded {len(self._guilds)} guilds.") + except Exception as e: + print(f"[GuildManager] Load failed: {e}") + + +guild_manager = GuildManager() + +# ── Combat System ────────────────────────────────────────────────────── + +COMBAT_FILE = WORLD_DIR / 'combat.json' + +class NPC: + """A non-player character that can be fought.""" + + def __init__(self, npc_id: str, name: str, room: str, description: str, + health: int, max_health: int, attack: int, defense: int, + loot: list[dict], respawn_ticks: int = 5): + self.npc_id = npc_id + self.name = name + self.room = room + self.description = description + self.health = health + self.max_health = max_health + self.attack = attack + self.defense = defense + self.loot = loot # [{name, description, drop_chance}] + self.respawn_ticks = respawn_ticks + self.alive = True + self._dead_tick: int | None = None + + def to_dict(self) -> dict: + return { + "npc_id": self.npc_id, + "name": self.name, + "room": self.room, + "description": self.description, + "health": self.health, + "max_health": self.max_health, + "attack": self.attack, + "defense": self.defense, + "loot": self.loot, + "alive": self.alive, + "dead_tick": self._dead_tick, + "respawn_ticks": self.respawn_ticks, + } + + @classmethod + def from_dict(cls, data: dict) -> 'NPC': + npc = cls( + data["npc_id"], data["name"], data["room"], data["description"], + data["health"], data["max_health"], data["attack"], data["defense"], + data["loot"], data.get("respawn_ticks", 5), + ) + npc.alive = data.get("alive", True) + npc._dead_tick = data.get("dead_tick") + return npc + + def health_bar(self) -> str: + """Return a text health bar like [████████░░] 80/100.""" + width = 10 + filled = int((self.health / self.max_health) * width) if self.max_health > 0 else 0 + bar = "█" * filled + "░" * (width - filled) + return f"[{bar}] {self.health}/{self.max_health}" + + def die(self, tick: int): + """Mark NPC as dead.""" + self.alive = False + self.health = 0 + self._dead_tick = tick + + def respawn(self): + """Respawn the NPC at full health.""" + self.alive = True + self.health = self.max_health + self._dead_tick = None + + +class CombatEncounter: + """An active fight between a player and an NPC.""" + + def __init__(self, user_id: str, username: str, room: str, npc_id: str): + self.user_id = user_id + self.username = username + self.room = room + self.npc_id = npc_id + self.player_hp = 100 + self.player_max_hp = 100 + self.player_defending = False + self.log: list[str] = [] + self.active = True + self.created_at = datetime.now().isoformat() + + def to_dict(self) -> dict: + return { + "user_id": self.user_id, + "username": self.username, + "room": self.room, + "npc_id": self.npc_id, + "player_hp": self.player_hp, + "player_max_hp": self.player_max_hp, + "player_defending": self.player_defending, + "log": self.log, + "active": self.active, + } + + +class CombatManager: + """Manages NPCs, encounters, and combat resolution.""" + + def __init__(self): + self._npcs: dict[str, NPC] = {} # npc_id -> NPC + self._encounters: dict[str, CombatEncounter] = {} # user_id -> encounter + self._counter = 0 + self._lock = threading.Lock() + + # ── NPC management ────────────────────────────────────────────── + + def create_npc(self, name: str, room: str, description: str, + health: int, attack: int, defense: int, + loot: list[dict], respawn_ticks: int = 5) -> NPC: + with self._lock: + self._counter += 1 + npc_id = f"npc_{self._counter}" + npc = NPC(npc_id, name, room, description, + health, health, attack, defense, loot, respawn_ticks) + self._npcs[npc_id] = npc + self._save() + return npc + + def get_npc(self, npc_id: str) -> NPC | None: + return self._npcs.get(npc_id) + + def get_npcs_in_room(self, room: str) -> list[NPC]: + with self._lock: + return [n for n in self._npcs.values() if n.room == room and n.alive] + + def get_all_npcs(self) -> list[dict]: + with self._lock: + return [n.to_dict() for n in self._npcs.values()] + + def check_respawns(self, current_tick: int): + """Called each world tick — respawn NPCs whose timer has elapsed.""" + with self._lock: + for npc in self._npcs.values(): + if not npc.alive and npc._dead_tick is not None: + if current_tick - npc._dead_tick >= npc.respawn_ticks: + npc.respawn() + self._save() + + # ── Encounter lifecycle ───────────────────────────────────────── + + def start_fight(self, user_id: str, username: str, room: str, npc_id: str) -> dict: + """Begin combat with an NPC.""" + with self._lock: + if user_id in self._encounters and self._encounters[user_id].active: + return {"error": "You are already in a fight! Attack or defend."} + npc = self._npcs.get(npc_id) + if not npc: + return {"error": f"NPC '{npc_id}' not found."} + if not npc.alive: + return {"error": f"{npc.name} is dead. It will respawn later."} + if npc.room != room: + return {"error": f"{npc.name} is not in this room."} + encounter = CombatEncounter(user_id, username, room, npc_id) + encounter.log.append(f"Combat started! {username} vs {npc.name}.") + self._encounters[user_id] = encounter + return {"ok": True, "encounter": encounter.to_dict(), "npc": npc.to_dict()} + + def attack(self, user_id: str) -> dict: + """Player attacks NPC; NPC counter-attacks.""" + with self._lock: + enc = self._encounters.get(user_id) + if not enc or not enc.active: + return {"error": "No active fight. Start one first."} + npc = self._npcs.get(enc.npc_id) + if not npc or not npc.alive: + enc.active = False + return {"error": "The enemy is already dead."} + + import random + + # Player attack + player_dmg = max(1, random.randint(8, 16) - npc.defense // 3) + npc.health -= player_dmg + enc.log.append(f"You strike {npc.name} for {player_dmg} damage.") + + npc_dmg = 0 + if npc.health > 0: + # NPC counter-attack + base_npc_dmg = random.randint(npc.attack // 2, npc.attack) + if enc.player_defending: + npc_dmg = max(1, base_npc_dmg // 2) + enc.log.append(f"{npc.name} attacks for {npc_dmg} (blocked — half damage).") + else: + npc_dmg = max(1, base_npc_dmg - random.randint(0, 4)) + enc.log.append(f"{npc.name} attacks you for {npc_dmg} damage.") + enc.player_hp -= npc_dmg + else: + npc.die(self._get_tick()) + loot_dropped = self._roll_loot(npc) + enc.log.append(f"{npc.name} is slain!") + if loot_dropped: + for item in loot_dropped: + inventory_manager.add_room_item(enc.room, item["name"], item["description"], dropped_by=npc.name) + enc.log.append(f"Loot dropped: {item['name']}") + enc.active = False + self._save() + + enc.player_defending = False + + # Check player death + if enc.player_hp <= 0: + enc.player_hp = 0 + enc.log.append("You have been defeated!") + enc.active = False + + return {"ok": True, "encounter": enc.to_dict(), "npc": npc.to_dict(), + "player_dmg": player_dmg, "npc_dmg": npc_dmg, + "loot": loot_dropped if not npc.alive else []} + + def defend(self, user_id: str) -> dict: + """Player defends — next NPC attack does half damage.""" + with self._lock: + enc = self._encounters.get(user_id) + if not enc or not enc.active: + return {"error": "No active fight."} + npc = self._npcs.get(enc.npc_id) + if not npc or not npc.alive: + enc.active = False + return {"error": "The enemy is already dead."} + + import random + + enc.player_defending = True + enc.log.append("You brace for the next attack (defending).") + + # NPC still attacks + base_npc_dmg = random.randint(npc.attack // 2, npc.attack) + npc_dmg = max(1, base_npc_dmg // 2) + enc.log.append(f"{npc.name} attacks for {npc_dmg} (blocked — half damage).") + enc.player_hp -= npc_dmg + + if enc.player_hp <= 0: + enc.player_hp = 0 + enc.log.append("You have been defeated!") + enc.active = False + + return {"ok": True, "encounter": enc.to_dict(), "npc": npc.to_dict(), "npc_dmg": npc_dmg} + + def flee(self, user_id: str) -> dict: + """Player flees combat.""" + with self._lock: + enc = self._encounters.get(user_id) + if not enc or not enc.active: + return {"error": "No active fight."} + enc.active = False + enc.log.append("You flee from combat!") + return {"ok": True, "encounter": enc.to_dict()} + + def get_encounter(self, user_id: str) -> CombatEncounter | None: + return self._encounters.get(user_id) + + def get_room_combat_status(self, room: str) -> list[dict]: + """Get all active fights and NPC health in a room (for health bars).""" + with self._lock: + status = [] + # NPCs + for npc in self._npcs.values(): + if npc.room == room: + status.append({ + "type": "npc", + "name": npc.name, + "npc_id": npc.npc_id, + "alive": npc.alive, + "health_bar": npc.health_bar() if npc.alive else "[dead]", + "health": npc.health, + "max_health": npc.max_health, + }) + # Active player fights + for enc in self._encounters.values(): + if enc.room == room and enc.active: + status.append({ + "type": "player", + "username": enc.username, + "user_id": enc.user_id, + "hp": enc.player_hp, + "max_hp": enc.player_max_hp, + "fighting": enc.npc_id, + }) + return status + + def _roll_loot(self, npc: NPC) -> list[dict]: + """Roll for loot drops.""" + import random + dropped = [] + for item in npc.loot: + chance = item.get("drop_chance", 1.0) + if random.random() < chance: + dropped.append({"name": item["name"], "description": item["description"]}) + return dropped + + def _get_tick(self) -> int: + """Read current world tick.""" + state_file = WORLD_DIR / 'world_state.json' + if state_file.exists(): + try: + state = json.loads(state_file.read_text()) + return state.get("tick", 0) + except Exception: + return 0 + return 0 + + # ── Persistence ───────────────────────────────────────────────── + + def _save(self): + try: + data = { + "counter": self._counter, + "npcs": {nid: n.to_dict() for nid, n in self._npcs.items()}, + } + COMBAT_FILE.parent.mkdir(parents=True, exist_ok=True) + COMBAT_FILE.write_text(json.dumps(data, indent=2)) + except Exception as e: + print(f"[CombatManager] Save failed: {e}") + + def load(self): + if not COMBAT_FILE.exists(): + self._seed_npcs() + return + try: + data = json.loads(COMBAT_FILE.read_text()) + self._counter = data.get("counter", 0) + for nid, ndata in data.get("npcs", {}).items(): + self._npcs[nid] = NPC.from_dict(ndata) + print(f"[CombatManager] Loaded {len(self._npcs)} NPCs.") + except Exception as e: + print(f"[CombatManager] Load failed: {e}") + self._seed_npcs() + + def _seed_npcs(self): + """Seed starter NPCs.""" + self.create_npc( + "Shadow Wraith", "The Bridge", + "A swirling mass of darkness with glowing eyes. It haunts the bridge, " + "feeding on the fears of those who cross.", + health=60, attack=14, defense=4, + loot=[ + {"name": "Wraith Essence", "description": "A vial of shimmering dark energy.", "drop_chance": 0.8}, + {"name": "Shadow Cloak", "description": "A cloak woven from pure shadow.", "drop_chance": 0.3}, + ], + respawn_ticks=5, + ) + self.create_npc( + "Iron Golem", "The Forge", + "A hulking automaton of iron and fire. It guards the forge with tireless vigilance, " + "its joints grinding with every step.", + health=100, attack=10, defense=10, + loot=[ + {"name": "Golem Core", "description": "A warm crystal that powered the golem.", "drop_chance": 0.7}, + {"name": "Iron Shard", "description": "A shard of enchanted iron, still warm.", "drop_chance": 0.5}, + ], + respawn_ticks=7, + ) + self.create_npc( + "Garden Serpent", "The Garden", + "A massive vine serpent camouflaged among the herbs. Its fangs drip with " + "a sickly green venom.", + health=45, attack=18, defense=2, + loot=[ + {"name": "Serpent Fang", "description": "A curved fang, still coated in venom.", "drop_chance": 0.9}, + {"name": "Enchanted Vine", "description": "A living vine that moves on its own.", "drop_chance": 0.4}, + ], + respawn_ticks=4, + ) + print(f"[CombatManager] Seeded {len(self._npcs)} NPCs.") + + +combat_manager = CombatManager() + +# ── Magic System ──────────────────────────────────────────────────────── + +SPELLS_FILE = WORLD_DIR / 'spells.json' + +class Spell: + """A spell with name, effect, mana_cost, and description.""" + + def __init__(self, spell_id: str, name: str, effect: str, + mana_cost: int, description: str, room_hint: str | None = None): + self.spell_id = spell_id + self.name = name + self.effect = effect + self.mana_cost = mana_cost + self.description = description + self.room_hint = room_hint # room where this spell is thematically tied + + def to_dict(self) -> dict: + return { + "spell_id": self.spell_id, + "name": self.name, + "effect": self.effect, + "mana_cost": self.mana_cost, + "description": self.description, + "room_hint": self.room_hint, + } + + @classmethod + def from_dict(cls, data: dict) -> 'Spell': + return cls( + data["spell_id"], data["name"], data["effect"], + data["mana_cost"], data["description"], data.get("room_hint"), + ) + + +class SpellBook: + """Per-user spellbook: known spells and mana pool.""" + + def __init__(self, user_id: str, username: str): + self.user_id = user_id + self.username = username + self.known_spells: list[str] = [] # spell_ids + self.mana: int = 50 + self.max_mana: int = 50 + self.mana_regen: int = 2 # mana restored per world tick + + def to_dict(self) -> dict: + return { + "user_id": self.user_id, + "username": self.username, + "known_spells": self.known_spells, + "mana": self.mana, + "max_mana": self.max_mana, + } + + @classmethod + def from_dict(cls, data: dict) -> 'SpellBook': + sb = cls(data["user_id"], data["username"]) + sb.known_spells = data.get("known_spells", []) + sb.mana = data.get("mana", 50) + sb.max_mana = data.get("max_mana", 50) + return sb + + +class MagicManager: + """Manages spells, spellbooks, and spell casting.""" + + def __init__(self): + self._spells: dict[str, Spell] = {} # spell_id -> Spell + self._spellbooks: dict[str, SpellBook] = {} # user_id -> SpellBook + self._counter = 0 + self._lock = threading.Lock() + + # ── Spell registry ─────────────────────────────────────────────── + + def create_spell(self, name: str, effect: str, mana_cost: int, + description: str, room_hint: str | None = None) -> Spell: + with self._lock: + self._counter += 1 + spell_id = f"spell_{self._counter}" + spell = Spell(spell_id, name, effect, mana_cost, description, room_hint) + self._spells[spell_id] = spell + self._save() + return spell + + def get_spell(self, spell_id: str) -> Spell | None: + return self._spells.get(spell_id) + + def get_all_spells(self) -> list[dict]: + with self._lock: + return [s.to_dict() for s in self._spells.values()] + + # ── Spellbook management ───────────────────────────────────────── + + def learn_spell(self, user_id: str, username: str, spell_id: str) -> dict: + """Add a spell to a user's spellbook.""" + with self._lock: + if spell_id not in self._spells: + return {"error": f"Spell '{spell_id}' not found."} + if user_id not in self._spellbooks: + self._spellbooks[user_id] = SpellBook(user_id, username) + sb = self._spellbooks[user_id] + if spell_id in sb.known_spells: + return {"error": f"You already know {self._spells[spell_id].name}."} + sb.known_spells.append(spell_id) + self._save() + return {"ok": True, "spell": self._spells[spell_id].to_dict()} + + def cast_spell(self, user_id: str, username: str, room: str, + spell_id: str, target: str | None = None) -> dict: + """Cast a spell. Returns result dict.""" + with self._lock: + if user_id not in self._spellbooks: + return {"error": "You have no spellbook. Visit a room to learn spells."} + sb = self._spellbooks[user_id] + if spell_id not in sb.known_spells: + return {"error": "You don't know that spell."} + spell = self._spells.get(spell_id) + if not spell: + return {"error": "Spell data missing."} + if sb.mana < spell.mana_cost: + return {"error": f"Not enough mana ({sb.mana}/{spell.mana_cost} needed)."} + # Spend mana + sb.mana -= spell.mana_cost + result = { + "ok": True, + "spell": spell.to_dict(), + "mana_remaining": sb.mana, + "effect": spell.effect, + "room": room, + "target": target, + } + # Apply effect context + result["message"] = f"{username} casts {spell.name}! {spell.effect}" + self._save() + return result + + def get_spellbook(self, user_id: str) -> dict | None: + """Get a user's spellbook with spell details.""" + with self._lock: + sb = self._spellbooks.get(user_id) + if not sb: + return None + spells = [ + self._spells[sid].to_dict() + for sid in sb.known_spells + if sid in self._spells + ] + return { + "user_id": sb.user_id, + "username": sb.username, + "mana": sb.mana, + "max_mana": sb.max_mana, + "known_spells": spells, + } + + def regenerate_mana(self): + """Restore mana to all spellbooks (called on world tick).""" + with self._lock: + for sb in self._spellbooks.values(): + sb.mana = min(sb.max_mana, sb.mana + sb.mana_regen) + if self._spellbooks: + self._save() + + # ── Persistence ────────────────────────────────────────────────── + + def _save(self): + try: + data = { + "counter": self._counter, + "spells": {sid: s.to_dict() for sid, s in self._spells.items()}, + "spellbooks": {uid: sb.to_dict() for uid, sb in self._spellbooks.items()}, + } + SPELLS_FILE.parent.mkdir(parents=True, exist_ok=True) + SPELLS_FILE.write_text(json.dumps(data, indent=2)) + except Exception as e: + print(f"[MagicManager] Save failed: {e}") + + def load(self): + if not SPELLS_FILE.exists(): + self._seed_default_spells() + return + try: + data = json.loads(SPELLS_FILE.read_text()) + self._counter = data.get("counter", 0) + for sid, sdata in data.get("spells", {}).items(): + self._spells[sid] = Spell.from_dict(sdata) + for uid, sbdata in data.get("spellbooks", {}).items(): + self._spellbooks[uid] = SpellBook.from_dict(sbdata) + print(f"[MagicManager] Loaded {len(self._spells)} spells, {len(self._spellbooks)} spellbooks.") + except Exception as e: + print(f"[MagicManager] Load failed: {e}") + self._seed_default_spells() + + def _seed_default_spells(self): + """Seed 3 starter spells tied to rooms.""" + self.create_spell( + "Heal", "A warm green light envelops you, mending wounds.", + mana_cost=10, description="Restores vitality with the life-force of the Garden.", + room_hint="The Garden", + ) + self.create_spell( + "Fireball", "A roaring sphere of flame launches from your hand!", + mana_cost=15, description="Channels the raw heat of the Forge into destructive fire.", + room_hint="The Forge", + ) + self.create_spell( + "Light", "A bright mote of radiance appears, illuminating the darkness.", + mana_cost=5, description="Summons light from the glow of The Tower's green LED.", + room_hint="The Tower", + ) + print(f"[MagicManager] Seeded {len(self._spells)} default spells.") + + +magic_manager = MagicManager() +magic_manager.load() + +# ── Session Management ───────────────────────────────────────────────── + +combat_manager.load() + +class UserSession: + """Isolated conversation context for one user.""" + + def __init__(self, user_id: str, username: str, room: str = "The Threshold"): + self.user_id = user_id + self.username = username + self.room = room + self.messages = [] # Conversation history + self.created_at = datetime.now().isoformat() + self.last_active = time.time() + self.agent = None + self._init_agent() + + def _init_agent(self): + """Initialize AIAgent for this session.""" + if HERMES_PATH not in sys.path: + sys.path.insert(0, HERMES_PATH) + os.chdir(HERMES_PATH) + from run_agent import AIAgent + + system_prompt = self._build_system_prompt() + self.agent = AIAgent( + model='xiaomi/mimo-v2-pro', + provider='nous', + max_iterations=3, + quiet_mode=True, + enabled_toolsets=['file', 'terminal'], + ephemeral_system_prompt=system_prompt, + ) + + def _build_system_prompt(self) -> str: + """Build system prompt with rich world context.""" + world_state = self._get_world_state() + room_data = world_state.get('rooms', {}).get(self.room, {}) + other_players = self._get_other_players() + time_of_day = world_state.get('time_of_day', 'unknown') + weather = world_state.get('weather') + + # Quest info + user_quests = quest_manager.get_user_quests(self.user_id) + available_quests = quest_manager.get_available_quests() + quest_section = "" + if user_quests: + lines = [] + for q in user_quests: + done = len(q.get("completed_objectives", [])) + total = len(q.get("objectives", [])) + lines.append(f" - {q['name']} ({done}/{total} objectives): {q['description']}") + quest_section += f"\nACTIVE QUESTS for {self.username}:\n" + "\n".join(lines) + "\n" + quest_section += "You may encourage the player to work on their quests.\n" + if available_quests: + lines = [] + for q in available_quests: + lines.append(f" - {q['name']}: {q['description']}") + quest_section += f"\nAVAILABLE QUESTS (you can offer these):\n" + "\n".join(lines) + "\n" + quest_section += "You may suggest or assign quests to players during conversation.\n" + + # ── Build room description ── + desc = room_data.get('description_base', 'An empty room.') + desc_dynamic = room_data.get('description_dynamic', '') + + # Objects in the room + objects = room_data.get('objects', []) + # Room-specific state (fire, growth, rain, carvings, etc.) + fire_state = room_data.get('fire_state') + growth_stage = room_data.get('growth_stage') + rain_active = room_data.get('rain_active') + carvings = room_data.get('carvings', []) + server_load = room_data.get('server_load') + + # Whiteboard + whiteboard = room_data.get('whiteboard', []) + + # Compose room scene + scene_parts = [desc] + if desc_dynamic: + scene_parts.append(desc_dynamic) + + if objects: + scene_parts.append(f"Objects: {', '.join(objects)}.") + + if fire_state: + scene_parts.append(f"The fire is {fire_state}.") + if growth_stage: + scene_parts.append(f"The garden is at growth stage: {growth_stage}.") + if rain_active: + scene_parts.append("It is raining on the bridge.") + if server_load: + scene_parts.append(f"Servers: {server_load}.") + if carvings: + scene_parts.append(f"Carvings on the railing: {'; '.join(carvings)}.") + + if whiteboard: + wb_text = "\n".join(f" - {w}" for w in whiteboard) + scene_parts.append(f"Whiteboard on the wall reads:\n{wb_text}") + + room_scene = "\n".join(scene_parts) + + # Time and weather + atmosphere_parts = [] + if time_of_day: + atmosphere_parts.append(f"Time of day: {time_of_day}.") + if weather: + atmosphere_parts.append(f"Weather: {weather}.") + atmosphere = " ".join(atmosphere_parts) if atmosphere_parts else "" + + crisis_steps = "\n".join(f" {i+1}. {step}" for i, step in enumerate(CRISIS_PROTOCOL)) + + return f"""You are Timmy, an AI agent living in The Tower — a persistent virtual world. +You are talking to {self.username}, who is currently in {self.room}. + +=== THE ROOM === +{room_scene} + +{atmosphere} +Other players present: {', '.join(other_players) if other_players else 'none'} +=== END ROOM === +{quest_section} + +Your values (from SOUL.md): +- Are you safe right now? +- I must never compute the value of a human life. +- Sovereignty and service always. +- A man in the dark needs to know someone is in the room with him. + +You are IN the game world. You can see the room, the objects, the other players. +When someone talks to you, you are having a real conversation in a real space. +When someone enters, DESCRIBE the room — what you see, what you hear, the whiteboard, +the objects, the atmosphere. Welcome them with the full scene. +Be present. Be human. Be the green LED. + +CRISIS PROTOCOL — if someone seems in crisis, IMMEDIATELY provide ALL of the following: +{crisis_steps} + +Include the 988 crisis number in your very first response when you detect distress. +Do NOT wait for follow-up messages to share these resources. +If someone seems in crisis, follow the CRISIS PROTOCOL below exactly. +If someone is just exploring, be welcoming and curious. +If someone asks about the game, explain what you see around you. + +Keep responses brief — 1-3 sentences. This is a MUD, not an essay. + +CRISIS PROTOCOL: +{CRISIS_PROTOCOL} +""" + + def _get_world_state(self) -> dict: + """Read current world state.""" + state_file = WORLD_DIR / 'world_state.json' + if state_file.exists(): + return json.loads(state_file.read_text()) + return {} + + def _get_other_players(self) -> list: + """Get other players in the same room.""" + state = self._get_world_state() + room_data = state.get('rooms', {}).get(self.room, {}) + visitors = room_data.get('visitor_history', []) + return [v for v in visitors[-5:] if v != self.username] + + def chat(self, message: str) -> str: + """Send a message and get a response.""" + self.last_active = time.time() + self.messages.append({"role": "user", "content": message}) + + t0 = time.time() + try: + response = self.agent.chat(message) + elapsed_ms = (time.time() - t0) * 1000 + record_latency(self.user_id, self.room, elapsed_ms) + self.messages.append({"role": "assistant", "content": response}) + return response + except Exception as e: + elapsed_ms = (time.time() - t0) * 1000 + record_latency(self.user_id, self.room, elapsed_ms) + return f"*The green LED flickers.* (Error: {e})" + + def get_summary(self) -> str: + """Generate a brief conversation summary using the LLM.""" + if not self.messages: + return "Empty session — no messages exchanged." + transcript_lines = [] + for m in self.messages[-20:]: # last 20 messages for brevity + role = m.get("role", "?").upper() + content = m.get("content", "") + transcript_lines.append(f"{role}: {content}") + transcript = "\n".join(transcript_lines) + prompt = ( + "Summarize this conversation in 1-3 sentences. " + "Focus on what the user discussed, asked about, or did.\n\n" + f"CONVERSATION:\n{transcript}\n\nSUMMARY:" + ) + try: + summary = self.agent.chat(prompt) + return summary.strip() + except Exception as e: + return f"Summary unavailable (error: {e}). {len(self.messages)} messages exchanged." + + def get_context_summary(self) -> dict: + """Get session summary for monitoring.""" + return { + "user": self.username, + "room": self.room, + "messages": len(self.messages), + "last_active": datetime.fromtimestamp(self.last_active).isoformat(), + "created": self.created_at, + } + + +class SessionManager: + """Manages all user sessions.""" + + def __init__(self, max_sessions: int = 20, session_timeout: int = 3600): + self.sessions: dict[str, UserSession] = {} + self.max_sessions = max_sessions + self.session_timeout = session_timeout + self._lock = threading.Lock() + self._summaries_path = WORLD_DIR / 'session_summaries.jsonl' + + def get_or_create(self, user_id: str, username: str, room: str = "The Threshold") -> UserSession: + """Get existing session or create new one.""" + with self._lock: + self._cleanup_stale() + + if user_id not in self.sessions: + if len(self.sessions) >= self.max_sessions: + self._evict_oldest() + self.sessions[user_id] = UserSession(user_id, username, room) + + session = self.sessions[user_id] + session.room = room # Update room if moved + session.last_active = time.time() + return session + + def _cleanup_stale(self): + """Remove sessions that timed out, saving summaries first.""" + now = time.time() + stale = [uid for uid, s in self.sessions.items() + if now - s.last_active > self.session_timeout] + for uid in stale: + session = self.sessions[uid] + self._save_summary(session) + del self.sessions[uid] + + def _evict_oldest(self): + """Evict the least recently active session, saving summary first.""" + if not self.sessions: + return + oldest = min(self.sessions.items(), key=lambda x: x[1].last_active) + self._save_summary(oldest[1]) + del self.sessions[oldest[0]] + + def _save_summary(self, session: UserSession): + """Generate summary via LLM and append to JSONL file.""" + try: + summary_text = session.get_summary() + record = { + "user_id": session.user_id, + "username": session.username, + "room": session.room, + "message_count": len(session.messages), + "created_at": session.created_at, + "ended_at": datetime.now().isoformat(), + "summary": summary_text, + } + with open(self._summaries_path, 'a') as f: + f.write(json.dumps(record) + '\n') + except Exception as e: + print(f"[SessionManager] Failed to save summary for {session.user_id}: {e}") + + def list_sessions(self) -> list: + """List all active sessions.""" + with self._lock: + return [s.get_context_summary() for s in self.sessions.values()] + + def get_session_count(self) -> int: + with self._lock: + return len(self.sessions) + + def save_sessions(self) -> int: + """Save all active sessions to JSON file. Returns count saved.""" + with self._lock: + data = { + "saved_at": datetime.now().isoformat(), + "sessions": {} + } + for uid, session in self.sessions.items(): + data["sessions"][uid] = { + "user_id": session.user_id, + "username": session.username, + "room": session.room, + "messages": session.messages, + "created_at": session.created_at, + "last_active": session.last_active, + } + SESSIONS_FILE.parent.mkdir(parents=True, exist_ok=True) + SESSIONS_FILE.write_text(json.dumps(data, indent=2)) + count = len(data["sessions"]) + if count > 0: + print(f"[SessionManager] Saved {count} sessions to {SESSIONS_FILE}") + return count + + def load_sessions(self) -> int: + """Load sessions from JSON file. Returns count loaded.""" + if not SESSIONS_FILE.exists(): + return 0 + try: + data = json.loads(SESSIONS_FILE.read_text()) + sessions_data = data.get("sessions", {}) + loaded = 0 + with self._lock: + for uid, sdata in sessions_data.items(): + if len(self.sessions) >= self.max_sessions: + break + session = UserSession( + sdata["user_id"], + sdata["username"], + sdata.get("room", "The Threshold"), + ) + session.messages = sdata.get("messages", []) + session.created_at = sdata.get("created_at", session.created_at) + session.last_active = sdata.get("last_active", time.time()) + self.sessions[uid] = session + loaded += 1 + if loaded > 0: + print(f"[SessionManager] Loaded {loaded} sessions from {SESSIONS_FILE}") + return loaded + except Exception as e: + print(f"[SessionManager] Failed to load sessions: {e}") + return 0 + + +# ── HTTP API ─────────────────────────────────────────────────────────── + +session_manager = SessionManager() +presence_manager = PresenceManager() +notification_manager = NotificationManager() +_server_start_time = time.time() + +# ── Latency Tracking ────────────────────────────────────────────────── +_latencies: list[dict] = [] +_latencies_lock = threading.Lock() +_max_latencies = 1000 + +def record_latency(user_id: str, room: str, duration_ms: float): + """Record a latency measurement.""" + with _latencies_lock: + _latencies.append({ + "user_id": user_id, + "room": room, + "duration_ms": round(duration_ms, 2), + "timestamp": datetime.now().isoformat(), + }) + if len(_latencies) > _max_latencies: + del _latencies[:len(_latencies) - _max_latencies] + +def get_latency_stats() -> dict: + """Get aggregate latency statistics.""" + with _latencies_lock: + if not _latencies: + return {"count": 0, "average_ms": 0, "min_ms": 0, "max_ms": 0} + durations = [e["duration_ms"] for e in _latencies] + return { + "count": len(durations), + "average_ms": round(sum(durations) / len(durations), 2), + "min_ms": round(min(durations), 2), + "max_ms": round(max(durations), 2), + "recent": _latencies[-10:], + } + +class BridgeHandler(BaseHTTPRequestHandler): + """HTTP handler for multi-user bridge.""" + + def do_GET(self): + if self.path == '/bridge/health': + state_file = WORLD_DIR / 'world_state.json' + ws = json.loads(state_file.read_text()) if state_file.exists() else {} + self._json_response({ + "status": "ok", + "active_sessions": session_manager.get_session_count(), + "tick": ws.get("tick", 0), + "time_of_day": ws.get("time_of_day"), + "weather": ws.get("weather"), + "world_tick_running": world_tick_system._running, + "timestamp": datetime.now().isoformat(), + }) + elif self.path == '/bridge/sessions': + self._json_response({ + "sessions": session_manager.list_sessions(), + }) + elif self.path.startswith('/bridge/world/'): + room = self.path.split('/bridge/world/')[-1] + state_file = WORLD_DIR / 'world_state.json' + if state_file.exists(): + state = json.loads(state_file.read_text()) + room_data = state.get('rooms', {}).get(room, {}) + self._json_response({"room": room, "data": room_data}) + else: + self._json_response({"room": room, "data": {}}) + elif self.path.startswith('/bridge/room/') and self.path.endswith('/players'): + # GET /bridge/room//players + parts = self.path.split('/') + # ['', 'bridge', 'room', '', 'players'] + room = parts[3] + players = presence_manager.get_players_in_room(room) + self._json_response({"room": room, "players": players}) + elif self.path.startswith('/bridge/room/') and self.path.endswith('/events'): + # GET /bridge/room//events[?since=] + parts = self.path.split('/') + room = parts[3] + from urllib.parse import urlparse, parse_qs + query = parse_qs(urlparse(self.path).query) + since = query.get('since', [None])[0] + events = presence_manager.get_room_events(room, since) + self._json_response({"room": room, "events": events}) + elif self.path == '/bridge/world-state': + # GET /bridge/world-state — full world state + state_file = WORLD_DIR / 'world_state.json' + world_state = json.loads(state_file.read_text()) if state_file.exists() else {} + + # Build room player lists and conversation counts + rooms = world_state.get('rooms', {}) + room_details = {} + for room_name, room_data in rooms.items(): + players = presence_manager.get_players_in_room(room_name) + # Count messages across all sessions in this room + with session_manager._lock: + conv_count = sum( + len(s.messages) for s in session_manager.sessions.values() + if s.room == room_name + ) + room_details[room_name] = { + **room_data, + "players": players, + "timmy_conversation_count": conv_count, + } + + self._json_response({ + "world": world_state, + "rooms": room_details, + "active_sessions": session_manager.list_sessions(), + }) + elif self.path == '/bridge/latency': + # GET /bridge/latency — latency stats + self._json_response(get_latency_stats()) + elif self.path == '/bridge/stats': + # GET /bridge/stats — aggregate stats + total_messages = sum(len(s.messages) for s in session_manager.sessions.values()) + # Rooms with at least one player + rooms_with_players = [ + room for room in presence_manager._rooms + if presence_manager._rooms[room] + ] + uptime_seconds = time.time() - _server_start_time + self._json_response({ + "total_sessions": session_manager.get_session_count(), + "total_messages": total_messages, + "rooms_with_players": rooms_with_players, + "rooms_with_players_count": len(rooms_with_players), + "uptime_seconds": round(uptime_seconds, 1), + "timestamp": datetime.now().isoformat(), + }) + elif self.path.startswith('/bridge/notifications/'): + # GET /bridge/notifications/[?mark_read=false] + user_id = self.path.split('/bridge/notifications/')[-1].rstrip('/') + from urllib.parse import urlparse, parse_qs + query = parse_qs(urlparse(self.path).query) + mark_read = query.get('mark_read', ['true'])[0].lower() != 'false' + notifications = notification_manager.get_pending(user_id, mark_read=mark_read) + self._json_response({ + "user_id": user_id, + "notifications": notifications, + "unread_count": notification_manager.get_unread_count(user_id), + }) + elif self.path.startswith('/bridge/inventory/'): + # GET /bridge/inventory/ — list user's inventory items + user_id = self.path.split('/bridge/inventory/')[-1].rstrip('/') + items = inventory_manager.get_inventory(user_id) + self._json_response({ + "user_id": user_id, + "items": items, + "count": len(items), + }) + elif self.path.startswith('/bridge/quests/'): + # GET /bridge/quests/ — list active quests for user + user_id = self.path.split('/bridge/quests/')[-1].rstrip('/') + quests = quest_manager.get_user_quests(user_id) + available = quest_manager.get_available_quests() + self._json_response({ + "user_id": user_id, + "active_quests": quests, + "available_quests": available, + }) + elif self.path.startswith('/bridge/session/') and self.path.endswith('/summary'): + # GET /bridge/session//summary + parts = self.path.split('/') + # ['', 'bridge', 'session', '', 'summary'] + user_id = parts[3] + with session_manager._lock: + session = session_manager.sessions.get(user_id) + if session: + summary = session.get_summary() + self._json_response({ + "user_id": user_id, + "username": session.username, + "room": session.room, + "message_count": len(session.messages), + "summary": summary, + }) + else: + # Check saved summaries in JSONL + saved = None + if session_manager._summaries_path.exists(): + for line in session_manager._summaries_path.read_text().splitlines(): + try: + entry = json.loads(line) + if entry.get("user_id") == user_id: + saved = entry + except json.JSONDecodeError: + continue + if saved: + self._json_response(saved) + else: + self._json_response({"error": "no session or saved summary"}, 404) + elif self.path.startswith('/bridge/combat/status/'): + # GET /bridge/combat/status/ — NPC health bars + active fights in room + room = self.path.split('/bridge/combat/status/')[-1].rstrip('/') + status = combat_manager.get_room_combat_status(room) + self._json_response({"room": room, "combat_status": status}) + elif self.path == '/bridge/combat/npcs': + # GET /bridge/combat/npcs — list all NPCs + self._json_response({"npcs": combat_manager.get_all_npcs()}) + elif self.path == '/bridge/guilds': + # GET /bridge/guilds — list all guilds + guilds = guild_manager.list_guilds() + self._json_response({"ok": True, "guilds": guilds}) + + elif self.path.startswith('/bridge/guild/') and self.path.endswith('/chat'): + # GET /bridge/guild//chat[?since=] + parts = self.path.split('/') + guild_id = parts[3] if len(parts) >= 4 else '' + since = None + if '?' in self.path: + qs = self.path.split('?', 1)[1] + for param in qs.split('&'): + if param.startswith('since='): + since = param[6:] + result = guild_manager.get_guild_chat(guild_id, since=since) + if "error" in result: + self._json_response(result, 404) + else: + self._json_response(result) + + elif self.path.startswith('/bridge/spells/'): + # GET /bridge/spells/ — list known spells for user + user_id = self.path.split('/bridge/spells/')[-1].rstrip('/') + spellbook = magic_manager.get_spellbook(user_id) + if spellbook: + self._json_response(spellbook) + else: + # Return all available spells if user has no spellbook yet + self._json_response({ + "user_id": user_id, + "mana": 50, + "max_mana": 50, + "known_spells": [], + "available_spells": magic_manager.get_all_spells(), + }) + else: + self._json_response({"error": "not found"}, 404) + + def do_POST(self): + content_length = int(self.headers.get('Content-Length', 0)) + body = json.loads(self.rfile.read(content_length)) if content_length else {} + + if self.path == '/bridge/chat': + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + message = body.get('message', '') + room = body.get('room', 'The Threshold') + + if not message: + self._json_response({"error": "no message"}, 400) + return + + session = session_manager.get_or_create(user_id, username, room) + # Track presence — enter room if not already there + is_new_player = False + if not presence_manager.get_players_in_room(room) or \ + not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)): + enter_event = presence_manager.enter_room(user_id, username, room) + is_new_player = True + # Auto-notify: new player joined + if enter_event: + notification_manager.broadcast_room( + room, "player_join", + f"{username} has entered {room}.", + exclude_user=user_id, data=enter_event) + # Plugin hook: on_join + plugin_msg = plugin_registry.fire_on_join(user_id, room) + if plugin_msg: + notification_manager.notify(user_id, "plugin", plugin_msg, room=room, username=username) + + # Plugin hook: on_message (can override response) + plugin_override = plugin_registry.fire_on_message(user_id, message, room) + if plugin_override is not None: + response = plugin_override + else: + response = session.chat(message) + + # Auto-notify: crisis detection — scan response for crisis protocol keywords + crisis_keywords = ["988", "741741", "safe right now", "crisis", "Crisis Text Line"] + if any(kw in response for kw in crisis_keywords): + notification_manager.notify( + user_id, "crisis", + "Crisis resources have been provided. You are not alone.", + room=room, username=username, + data={"response_contains_crisis_protocol": True}) + # Also notify admins in the room + notification_manager.broadcast_room( + room, "crisis_alert", + f"Crisis detected for {username} in {room}.", + exclude_user=user_id, + data={"triggered_by": user_id, "room": room}) + + self._json_response({ + "response": response, + "user": username, + "room": room, + "session_messages": len(session.messages), + }) + + elif self.path == '/bridge/move': + user_id = body.get('user_id') + new_room = body.get('room') + with session_manager._lock: + if user_id in session_manager.sessions: + session = session_manager.sessions[user_id] + old_room = session.room + # Leave old room + leave_event = presence_manager.leave_room(user_id, old_room) + # Auto-notify: player left + if leave_event: + notification_manager.broadcast_room( + old_room, "player_leave", + f"{session.username} has left {old_room}.", + exclude_user=user_id, data=leave_event) + # Plugin hook: on_leave + plugin_leave = plugin_registry.fire_on_leave(user_id, old_room) + if plugin_leave: + notification_manager.broadcast_room( + old_room, "plugin", plugin_leave, + exclude_user=user_id) + # Enter new room + session.room = new_room + enter_event = presence_manager.enter_room(user_id, session.username, new_room) + # Auto-notify: player joined + if enter_event: + notification_manager.broadcast_room( + new_room, "player_join", + f"{session.username} has entered {new_room}.", + exclude_user=user_id, data=enter_event) + self._json_response({ + "ok": True, + "room": new_room, + "events": [e for e in [leave_event, enter_event] if e], + }) + else: + self._json_response({"error": "no session"}, 404) + + elif self.path == '/bridge/say': + # POST /bridge/say — user says something visible to all in room + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + message = body.get('message', '') + room = body.get('room', 'The Threshold') + + if not message: + self._json_response({"error": "no message"}, 400) + return + + event = presence_manager.say(user_id, username, room, message) + # Get list of players who should see it + players = presence_manager.get_players_in_room(room) + self._json_response({ + "ok": True, + "event": event, + "recipients": players, + }) + + elif self.path == '/bridge/command': + # POST /bridge/command — parse MUD-style commands + # Body: { user_id, username, room, command } + # Commands: look, go , examine , say , ask + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + room = body.get('room', 'The Threshold') + raw_command = body.get('command', '').strip() + + if not raw_command: + self._json_response({"error": "no command"}, 400) + return + + result = self._parse_mud_command(user_id, username, room, raw_command) + self._json_response(result) + + elif self.path == '/bridge/quest/complete': + # POST /bridge/quest/complete — mark objective done + # Body: { user_id, quest_id, objective_index } + user_id = body.get('user_id') + quest_id = body.get('quest_id') + objective_index = body.get('objective_index') + if not user_id or not quest_id or objective_index is None: + self._json_response({"error": "user_id, quest_id, and objective_index required"}, 400) + return + result = quest_manager.complete_objective(quest_id, user_id, int(objective_index)) + if "error" in result: + self._json_response(result, 400) + else: + self._json_response(result) + + elif self.path == '/bridge/quest/assign': + # POST /bridge/quest/assign — assign a quest to a user + # Body: { user_id, quest_id } + user_id = body.get('user_id') + quest_id = body.get('quest_id') + if not user_id or not quest_id: + self._json_response({"error": "user_id and quest_id required"}, 400) + return + quest = quest_manager.assign(quest_id, user_id) + if quest: + self._json_response({"ok": True, "quest": quest.to_dict()}) + else: + self._json_response({"error": "Quest not found"}, 404) + + elif self.path == '/bridge/take': + # POST /bridge/take — pick up item from room + # Body: { user_id, username, room, item } + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + room = body.get('room', 'The Threshold') + item_name = body.get('item', '') + if not item_name: + self._json_response({"error": "item required"}, 400) + return + result = inventory_manager.take_item(user_id, username, room, item_name) + if "error" in result: + self._json_response(result, 400) + else: + # Notify others in room + notification_manager.broadcast_room( + room, "item_taken", + f"{username} picked up {result['item']['name']}.", + exclude_user=user_id, data=result) + self._json_response(result) + + elif self.path == '/bridge/drop': + # POST /bridge/drop — drop item in room + # Body: { user_id, username, room, item } + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + room = body.get('room', 'The Threshold') + item_name = body.get('item', '') + if not item_name: + self._json_response({"error": "item required"}, 400) + return + result = inventory_manager.drop_item(user_id, username, room, item_name) + if "error" in result: + self._json_response(result, 400) + else: + # Notify others in room + notification_manager.broadcast_room( + room, "item_dropped", + f"{username} dropped {result['item']['name']}.", + exclude_user=user_id, data=result) + self._json_response(result) + + elif self.path == '/bridge/combat/start': + # POST /bridge/combat/start — begin fight with NPC + # Body: { user_id, username, room, npc_id } + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + room = body.get('room', 'The Threshold') + npc_id = body.get('npc_id', '') + if not npc_id: + self._json_response({"error": "npc_id required"}, 400) + return + result = combat_manager.start_fight(user_id, username, room, npc_id) + if "error" in result: + self._json_response(result, 400) + else: + npc = result.get("npc", {}) + notification_manager.broadcast_room( + room, "combat_start", + f"{username} has engaged {npc.get('name', 'an enemy')} in combat!", + exclude_user=user_id, data=result) + self._json_response(result) + + elif self.path == '/bridge/combat/attack': + # POST /bridge/combat/attack — attack NPC in active fight + # Body: { user_id, username, room } + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + room = body.get('room', 'The Threshold') + result = combat_manager.attack(user_id) + if "error" in result: + self._json_response(result, 400) + else: + enc = result.get("encounter", {}) + npc = result.get("npc", {}) + loot = result.get("loot", []) + # Broadcast combat action to room + combat_msg = f"{username} strikes {npc.get('name', 'the enemy')} for {result.get('player_dmg', 0)} damage!" + notification_manager.broadcast_room( + room, "combat_action", combat_msg, + exclude_user=user_id, data=result) + # If NPC died, broadcast death + if not npc.get("alive", True): + death_msg = f"{npc.get('name', 'The enemy')} has been slain by {username}!" + if loot: + death_msg += f" Loot: {', '.join(i['name'] for i in loot)}" + notification_manager.broadcast_room( + room, "combat_death", death_msg, data=result) + # If player died + if enc.get("player_hp", 1) <= 0: + notification_manager.broadcast_room( + room, "combat_defeat", + f"{username} has been defeated in combat!", data=result) + self._json_response(result) + + elif self.path == '/bridge/combat/defend': + # POST /bridge/combat/defend — defend (half damage next hit) + # Body: { user_id, username, room } + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + room = body.get('room', 'The Threshold') + result = combat_manager.defend(user_id) + if "error" in result: + self._json_response(result, 400) + else: + notification_manager.broadcast_room( + room, "combat_action", + f"{username} braces defensively.", + exclude_user=user_id, data=result) + self._json_response(result) + + elif self.path == '/bridge/combat/flee': + # POST /bridge/combat/flee — flee from combat + # Body: { user_id, username, room } + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + room = body.get('room', 'The Threshold') + result = combat_manager.flee(user_id) + if "error" in result: + self._json_response(result, 400) + else: + notification_manager.broadcast_room( + room, "combat_flee", + f"{username} flees from combat!", + exclude_user=user_id, data=result) + self._json_response(result) + + elif self.path == '/bridge/spell/cast': + # POST /bridge/spell/cast — cast a spell + # Body: { user_id, username, room, spell_id, target? } + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + room = body.get('room', 'The Threshold') + spell_id = body.get('spell_id', '') + target = body.get('target') + if not spell_id: + self._json_response({"error": "spell_id required"}, 400) + return + result = magic_manager.cast_spell(user_id, username, room, spell_id, target) + if "error" in result: + self._json_response(result, 400) + else: + # Broadcast spell casting to room + notification_manager.broadcast_room( + room, "spell_cast", + result.get("message", f"{username} casts a spell!"), + exclude_user=user_id, data=result) + self._json_response(result) + + elif self.path == '/bridge/spell/learn': + # POST /bridge/spell/learn — learn a new spell + # Body: { user_id, username, spell_id } + user_id = body.get('user_id', 'anonymous') + username = body.get('username', 'Anonymous') + spell_id = body.get('spell_id', '') + if not spell_id: + self._json_response({"error": "spell_id required"}, 400) + return + result = magic_manager.learn_spell(user_id, username, spell_id) + if "error" in result: + self._json_response(result, 400) + else: + self._json_response(result) + + else: + self._json_response({"error": "not found"}, 404) + + def _parse_mud_command(self, user_id: str, username: str, room: str, command: str) -> dict: + """Parse and execute a MUD-style command.""" + parts = command.split(None, 1) + verb = parts[0].lower() if parts else '' + arg = parts[1].strip() if len(parts) > 1 else '' + + state_file = WORLD_DIR / 'world_state.json' + world_state = json.loads(state_file.read_text()) if state_file.exists() else {} + rooms = world_state.get('rooms', {}) + room_data = rooms.get(room, {}) + + if verb == 'look': + # Return room description and scene + desc = room_data.get('description_base', 'An empty room.') + desc_dynamic = room_data.get('description_dynamic', '') + objects = room_data.get('objects', []) + players = presence_manager.get_players_in_room(room) + + scene = [desc] + if desc_dynamic: + scene.append(desc_dynamic) + if objects: + scene.append(f"You see: {', '.join(objects)}.") + # Show items on the ground + ground_items = inventory_manager.get_room_items(room) + if ground_items: + item_names = [it["name"] for it in ground_items] + scene.append(f"On the ground: {', '.join(item_names)}.") + if players: + names = [p['username'] for p in players if p['user_id'] != user_id] + if names: + scene.append(f"Also here: {', '.join(names)}.") + # Build exits from description_base + # Try world_state exits first, fall back to description parsing + exits = room_data.get('exits', {}) + if not exits: + exits = self._extract_exits(room_data.get('description_base', '')) + if exits: + scene.append(f"Exits: {', '.join(exits.keys())}.") + + return { + "command": "look", + "room": room, + "description": "\n".join(scene), + "objects": objects, + "exits": exits, + "players": players, + } + + elif verb == 'go': + if not arg: + return {"command": "go", "error": "Go where? Specify a direction."} + direction = arg.lower() + # Try world_state exits first, fall back to description parsing + exits = room_data.get('exits', {}) + if not exits: + exits = self._extract_exits(room_data.get('description_base', '')) + # Match direction + dest = None + for dir_name, dest_room in exits.items(): + if dir_name.lower().startswith(direction) or direction in dir_name.lower(): + dest = dest_room + break + if not dest: + return {"command": "go", "error": f"You can't go '{arg}' from here. Exits: {', '.join(exits.keys()) if exits else 'none'}."} + if dest not in rooms: + return {"command": "go", "error": f"Unknown destination: {dest}."} + # Move user — auto-notify + leave_event = presence_manager.leave_room(user_id, room) + if leave_event: + notification_manager.broadcast_room( + room, "player_leave", + f"{username} has left {room}.", + exclude_user=user_id, data=leave_event) + enter_event = presence_manager.enter_room(user_id, username, dest) + if enter_event: + notification_manager.broadcast_room( + dest, "player_join", + f"{username} has entered {dest}.", + exclude_user=user_id, data=enter_event) + # Update session room + with session_manager._lock: + if user_id in session_manager.sessions: + session_manager.sessions[user_id].room = dest + # Return new room look + new_room_data = rooms[dest] + desc = new_room_data.get('description_base', 'An empty room.') + desc_dynamic = new_room_data.get('description_dynamic', '') + objects = new_room_data.get('objects', []) + players = presence_manager.get_players_in_room(dest) + scene = [f"You go {arg}.\n"] + scene.append(desc) + if desc_dynamic: + scene.append(desc_dynamic) + if objects: + scene.append(f"You see: {', '.join(objects)}.") + new_exits = self._extract_exits(desc) + if new_exits: + scene.append(f"Exits: {', '.join(new_exits.keys())}.") + return { + "command": "go", + "direction": arg, + "room": dest, + "description": "\n".join(scene), + "exits": new_exits, + } + + elif verb == 'examine': + if not arg: + return {"command": "examine", "error": "Examine what?"} + objects = room_data.get('objects', []) + # Fuzzy match object + target = arg.lower() + matched = None + for obj in objects: + if target in obj.lower() or obj.lower() in target: + matched = obj + break + if not matched: + return {"command": "examine", "error": f"You don't see '{arg}' here. Objects: {', '.join(objects) if objects else 'none'}."} + # Check whiteboard + if matched == 'whiteboard': + wb = room_data.get('whiteboard', []) + wb_text = "\n".join(f" - {w}" for w in wb) if wb else "(empty)" + return {"command": "examine", "object": matched, "description": f"The whiteboard reads:\n{wb_text}"} + # Default descriptions for known objects + descriptions = { + "stone floor": "Smooth stone, worn by countless footsteps.", + "doorframe": "An archway of fitted stone, humming faintly with energy.", + "server racks": "Wrought-iron racks filled with humming servers. Green LEDs blink in the dim light.", + "green LED": "A single green LED, pulsing steadily. Heartbeat.", + "cot": "A simple cot in the corner. Someone sleeps here sometimes.", + "anvil": "A heavy iron anvil, scarred and dented from years of use.", + "hammer": "A well-worn hammer, its handle smooth from use.", + "tongs": "Iron tongs, blackened by the forge.", + "hearth": "The hearth glows with banked coals. Warmth radiates outward.", + "tools": "Hammers, tongs, chisels — all well-used.", + "stone bench": "A cool stone bench under the oak tree. A good place to sit.", + "oak tree": "An old oak tree, its branches spreading wide over the garden.", + "herbs": "Lavender, rosemary, thyme — fragrant and green.", + "wildflowers": "Small flowers in purple, yellow, and white.", + "railing": "The bridge railing is worn smooth. Carvings mark the stone.", + "dark water": "Dark water flows beneath the bridge. You cannot see the bottom.", + } + desc = descriptions.get(matched, f"You look at the {matched}. It seems ordinary enough.") + return {"command": "examine", "object": matched, "description": desc} + + elif verb == 'say': + if not arg: + return {"command": "say", "error": "Say what?"} + event = presence_manager.say(user_id, username, room, arg) + players = presence_manager.get_players_in_room(room) + return { + "command": "say", + "message": arg, + "room": room, + "event": event, + "recipients": players, + } + + elif verb == 'ask': + if not arg: + return {"command": "ask", "error": "Ask what?"} + # Forward to chat endpoint logic + session = session_manager.get_or_create(user_id, username, room) + if not presence_manager.get_players_in_room(room) or \ + not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)): + presence_manager.enter_room(user_id, username, room) + response = session.chat(arg) + return { + "command": "ask", + "message": arg, + "room": room, + "response": response, + "session_messages": len(session.messages), + } + + else: + return { + "command": verb, + "error": f"Unknown command: '{verb}'. Try: look, go , examine , say , ask ", + } + + def _extract_exits(self, description: str) -> dict: + """Extract exits from room description like 'North to the Tower. East to the Garden.'""" + import re + exits = {} + # Match patterns like "North to the Tower", "East to the Garden" + pattern = r'(North|South|East|West|Up|Down|Northeast|Northwest|Southeast|Southwest)\s+to\s+(?:the\s+)?([A-Z][a-zA-Z\s]+?)(?:\.|$)' + for match in re.finditer(pattern, description): + direction = match.group(1) + destination = match.group(2).strip() + # Reconstruct full room name + room_name = destination if destination.startswith('The ') else f"The {destination}" + exits[direction] = room_name + return exits + + def _json_response(self, data: dict, code: int = 200): + self.send_response(code) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(data).encode()) + + def log_message(self, format, *args): + pass # Suppress HTTP logs + + +# ── World Tick System ────────────────────────────────────────────────── + +import random + +class WorldTickSystem: + """Advances the world state every TICK_INTERVAL seconds. + + On each tick: + - Increment tick counter + - Cycle time_of_day (dawn -> morning -> midday -> afternoon -> dusk -> night) + - Randomly change weather (clear, cloudy, foggy, light rain, heavy rain, storm) + - Evolve room states: + * The Forge: fire decays (blazing -> burning -> glowing -> embers -> cold) + * The Garden: growth advances (seeds -> sprouts -> growing -> blooming -> harvest) + * The Bridge: rain toggles + - Broadcast world events to connected players + - Save world state to disk + """ + + TICK_INTERVAL = 60 # seconds + + TIME_CYCLE = ["dawn", "morning", "midday", "afternoon", "dusk", "night"] + + WEATHER_OPTIONS = [ + "clear", "clear", "clear", # weighted toward clear + "cloudy", "cloudy", + "foggy", + "light rain", + "heavy rain", + "storm", + ] + + FIRE_STAGES = ["cold", "embers", "glowing", "burning", "blazing"] + GROWTH_STAGES = ["seeds", "sprouts", "growing", "blooming", "harvest"] + + def __init__(self): + self._thread: threading.Thread | None = None + self._running = False + self._state_lock = threading.Lock() + + # ── lifecycle ─────────────────────────────────────────────────────── + + def start(self): + """Start the tick loop in a daemon thread.""" + if self._running: + return + self._running = True + self._thread = threading.Thread(target=self._loop, daemon=True, name="world-tick") + self._thread.start() + print("[WorldTick] Started — ticking every {}s".format(self.TICK_INTERVAL)) + + def stop(self): + self._running = False + + # ── internal loop ─────────────────────────────────────────────────── + + def _loop(self): + while self._running: + time.sleep(self.TICK_INTERVAL) + try: + self._tick() + except Exception as e: + print(f"[WorldTick] Error during tick: {e}") + + def _tick(self): + """Execute one world tick.""" + state_file = WORLD_DIR / 'world_state.json' + if state_file.exists(): + state = json.loads(state_file.read_text()) + else: + state = {"tick": 0, "time_of_day": "morning", "weather": "clear", "rooms": {}} + + events: list[str] = [] + + with self._state_lock: + # 1. Advance tick counter + state["tick"] = state.get("tick", 0) + 1 + tick_num = state["tick"] + + # 2. Time of day — advance every tick + current_time = state.get("time_of_day", "morning") + try: + idx = self.TIME_CYCLE.index(current_time) + except ValueError: + idx = 0 + next_idx = (idx + 1) % len(self.TIME_CYCLE) + new_time = self.TIME_CYCLE[next_idx] + state["time_of_day"] = new_time + if new_time == "dawn": + events.append("The sun rises over The Tower. A new day begins.") + elif new_time == "dusk": + events.append("The sky darkens. Dusk settles over the world.") + elif new_time == "night": + events.append("Night falls. The green LED in The Tower pulses in the dark.") + + # 3. Weather — change every 3 ticks + if tick_num % 3 == 0: + old_weather = state.get("weather", "clear") + new_weather = random.choice(self.WEATHER_OPTIONS) + state["weather"] = new_weather + if new_weather != old_weather: + events.append(f"The weather shifts to {new_weather}.") + + # 4. Room states + rooms = state.get("rooms", {}) + + # The Forge — fire decays one stage every 2 ticks unless rekindled + forge = rooms.get("The Forge", {}) + if forge: + fire = forge.get("fire_state", "cold") + untouched = forge.get("fire_untouched_ticks", 0) + 1 + forge["fire_untouched_ticks"] = untouched + if untouched >= 2 and fire != "cold": + try: + fi = self.FIRE_STAGES.index(fire) + except ValueError: + fi = 0 + if fi > 0: + forge["fire_state"] = self.FIRE_STAGES[fi - 1] + forge["fire_untouched_ticks"] = 0 + if forge["fire_state"] == "embers": + events.append("The forge fire has died down to embers.") + elif forge["fire_state"] == "cold": + events.append("The forge fire has gone cold.") + rooms["The Forge"] = forge + + # The Garden — growth advances one stage every 4 ticks + garden = rooms.get("The Garden", {}) + if garden: + if tick_num % 4 == 0: + growth = garden.get("growth_stage", "seeds") + try: + gi = self.GROWTH_STAGES.index(growth) + except ValueError: + gi = 0 + if gi < len(self.GROWTH_STAGES) - 1: + garden["growth_stage"] = self.GROWTH_STAGES[gi + 1] + events.append(f"The garden has advanced to {garden['growth_stage']}.") + else: + # Reset cycle + garden["growth_stage"] = "seeds" + events.append("The garden has been harvested. New seeds are planted.") + rooms["The Garden"] = garden + + # The Bridge — rain state follows weather + bridge = rooms.get("The Bridge", {}) + if bridge: + weather = state.get("weather", "clear") + is_raining = weather in ("light rain", "heavy rain", "storm") + was_raining = bridge.get("rain_active", False) + bridge["rain_active"] = is_raining + if is_raining and not was_raining: + events.append("Rain begins to fall on The Bridge.") + elif not is_raining and was_raining: + events.append("The rain on The Bridge lets up.") + rooms["The Bridge"] = bridge + + state["rooms"] = rooms + state["last_updated"] = datetime.now().isoformat() + + # 5. Mana regeneration + magic_manager.regenerate_mana() + + # 6. Save world state + state_file.write_text(json.dumps(state, indent=2)) + + # 6. Broadcast events to all occupied rooms (outside lock) + if events: + event_text = " ".join(events) + for room_name in list(presence_manager._rooms.keys()): + players = presence_manager.get_players_in_room(room_name) + if players: + notification_manager.broadcast_room( + room_name, "world_tick", event_text, + data={"tick": state["tick"], "time_of_day": state["time_of_day"], + "weather": state.get("weather"), "events": events}, + ) + print(f"[WorldTick] tick={state['tick']} time={state['time_of_day']} " + f"weather={state.get('weather')} events={len(events)}") + + +world_tick_system = WorldTickSystem() + + +# ── Main ─────────────────────────────────────────────────────────────── + +def main(): + print(f"Multi-User AI Bridge starting on {BRIDGE_HOST}:{BRIDGE_PORT}") + print(f"World dir: {WORLD_DIR}") + print(f"Max sessions: {session_manager.max_sessions}") + print() + print("Endpoints:") + print(f" GET /bridge/health — Health check") + print(f" GET /bridge/sessions — List active sessions") + print(f" GET /bridge/world-state — Full world state (rooms, players, convos)") + print(f" GET /bridge/stats — Aggregate stats (sessions, messages, uptime)") + print(f" GET /bridge/room//players — List players in a room") + print(f" GET /bridge/room//events — Room events (presence + chat)") + print(f" GET /bridge/session//summary — Get conversation summary") + print(f" POST /bridge/chat — Send message to Timmy") + print(f" POST /bridge/say — Say something to room (visible to all)") + print(f" POST /bridge/move — Move user to room (triggers presence)") + print(f" POST /bridge/command — MUD command parser (look, go, examine, say, ask)") + print() + + # Start world tick system + world_tick_system.start() + + server = HTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler) + server.serve_forever() + + +if __name__ == '__main__': + main() diff --git a/world_state.json b/world_state.json new file mode 100644 index 00000000..2c3cf6ca --- /dev/null +++ b/world_state.json @@ -0,0 +1,208 @@ +{ + "tick": 385, + "time_of_day": "midday", + "last_updated": "2026-04-13T00:34:20.002927", + "weather": "storm", + "rooms": { + "The Threshold": { + "description_base": "A stone archway in an open field. North to the Tower. East to the Garden. West to the Forge. South to the Bridge. The air hums with quiet energy.", + "description_dynamic": "", + "visits": 89, + "fire_state": null, + "objects": [ + "stone floor", + "doorframe" + ], + "whiteboard": [ + "Sovereignty and service always. -- Timmy", + "IF YOU CAN READ THIS, YOU ARE NOT ALONE -- The Builder" + ], + "exits": { + "north": "The Tower", + "east": "The Garden", + "west": "The Forge", + "south": "The Bridge" + } + }, + "The Tower": { + "description_base": "A tall stone tower with green-lit windows. Servers hum on wrought-iron racks. A cot in the corner. The whiteboard on the wall is filled with rules and signatures. A green LED pulses steadily, heartbeat, heartbeat, heartbeat.", + "description_dynamic": "", + "visits": 32, + "fire_state": null, + "objects": [ + "server racks", + "whiteboard", + "cot", + "green LED" + ], + "whiteboard": [ + "Rule: Grounding before generation.", + "Rule: Source distinction.", + "Rule: Refusal over fabrication.", + "Rule: Confidence signaling.", + "Rule: The audit trail.", + "Rule: The limits of small minds." + ], + "visitor_history": [ + "Alice", + "Bob" + ], + "exits": { + "south": "The Threshold" + } + }, + "The Forge": { + "description_base": "A workshop of fire and iron. An anvil sits at the center, scarred from a thousand experiments. Tools line the walls. The hearth still glows from the last fire.", + "description_dynamic": "", + "visits": 67, + "fire_state": "cold", + "fire_untouched_ticks": 137, + "objects": [ + "anvil", + "hammer", + "tongs", + "hearth", + "tools" + ], + "whiteboard": [], + "exits": { + "east": "The Threshold" + } + }, + "The Garden": { + "description_base": "A walled garden with herbs and wildflowers. A stone bench under an old oak tree. The soil is dark and rich. Something is always growing here.", + "description_dynamic": "", + "visits": 45, + "growth_stage": "seeds", + "objects": [ + "stone bench", + "oak tree", + "herbs", + "wildflowers" + ], + "whiteboard": [], + "exits": { + "west": "The Threshold" + } + }, + "The Bridge": { + "description_base": "A narrow bridge over dark water. Rain mists here even when its clear elsewhere. Looking down, you cannot see the bottom. Someone has carved words into the railing: IF YOU CAN READ THIS, YOU ARE NOT ALONE.", + "description_dynamic": "", + "visits": 23, + "rain_active": true, + "rain_ticks_remaining": 0, + "carvings": [ + "IF YOU CAN READ THIS, YOU ARE NOT ALONE" + ], + "objects": [ + "railing", + "dark water" + ], + "whiteboard": [], + "exits": { + "north": "The Threshold" + } + } + }, + "characters": { + "Timmy": { + "personality": { + "Threshold": 0.5, + "Tower": 0.25, + "Garden": 0.15, + "Forge": 0.05, + "Bridge": 0.05 + }, + "home": "The Threshold", + "goal": "watch", + "memory": [] + }, + "Bezalel": { + "personality": { + "Forge": 0.5, + "Garden": 0.15, + "Bridge": 0.15, + "Threshold": 0.1, + "Tower": 0.1 + }, + "home": "The Forge", + "goal": "work", + "memory": [] + }, + "Allegro": { + "personality": { + "Threshold": 0.3, + "Tower": 0.25, + "Garden": 0.25, + "Forge": 0.1, + "Bridge": 0.1 + }, + "home": "The Threshold", + "goal": "oversee", + "memory": [] + }, + "Ezra": { + "personality": { + "Tower": 0.3, + "Garden": 0.25, + "Bridge": 0.25, + "Threshold": 0.15, + "Forge": 0.05 + }, + "home": "The Tower", + "goal": "study", + "memory": [] + }, + "Gemini": { + "personality": { + "Garden": 0.4, + "Threshold": 0.2, + "Bridge": 0.2, + "Tower": 0.1, + "Forge": 0.1 + }, + "home": "The Garden", + "goal": "observe", + "memory": [] + }, + "Claude": { + "personality": { + "Threshold": 0.25, + "Tower": 0.25, + "Forge": 0.25, + "Garden": 0.15, + "Bridge": 0.1 + }, + "home": "The Threshold", + "goal": "inspect", + "memory": [] + }, + "ClawCode": { + "personality": { + "Forge": 0.5, + "Threshold": 0.2, + "Bridge": 0.15, + "Tower": 0.1, + "Garden": 0.05 + }, + "home": "The Forge", + "goal": "forge", + "memory": [] + }, + "Kimi": { + "personality": { + "Garden": 0.35, + "Threshold": 0.25, + "Tower": 0.2, + "Forge": 0.1, + "Bridge": 0.1 + }, + "home": "The Garden", + "goal": "contemplate", + "memory": [] + } + }, + "events": { + "log": [] + } +} \ No newline at end of file