QuestManager, InventoryManager, GuildManager, CombatManager, and MagicManager all had load() methods that were never called. This meant quests were never seeded, items never appeared in rooms, and all game data started empty on every server restart. Fixes #1351
2889 lines
121 KiB
Python
2889 lines
121 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Multi-User AI Bridge for Evennia MUD.
|
|
|
|
Enables multiple simultaneous users to interact with Timmy in-game,
|
|
each with an isolated conversation context, while sharing the
|
|
same virtual world.
|
|
|
|
Architecture:
|
|
User A ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_a)
|
|
User B ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_b)
|
|
User C ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_c)
|
|
|
|
Each user gets their own AIAgent instance with:
|
|
- Isolated conversation history
|
|
- Shared world state (room, other players, objects)
|
|
- Per-user session memory
|
|
|
|
The bridge runs as an HTTP server alongside Evennia.
|
|
Evennia commands call the bridge to get Timmy's responses.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import threading
|
|
import signal
|
|
import hashlib
|
|
import os
|
|
import sys
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from socketserver import ThreadingMixIn
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
|
|
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
|
|
"""Thread-per-request HTTP server."""
|
|
daemon_threads = True
|
|
|
|
# ── Plugin System ──────────────────────────────────────────────────────
|
|
|
|
class Plugin:
|
|
"""Base class for bridge plugins. Override methods to add game mechanics."""
|
|
|
|
name: str = "unnamed"
|
|
description: str = ""
|
|
|
|
def on_message(self, user_id: str, message: str, room: str) -> str | None:
|
|
"""Called on every chat message. Return a string to override the response,
|
|
or None to let the normal AI handle it."""
|
|
return None
|
|
|
|
def on_join(self, user_id: str, room: str) -> str | None:
|
|
"""Called when a player joins a room. Return a message to broadcast, or None."""
|
|
return None
|
|
|
|
def on_leave(self, user_id: str, room: str) -> str | None:
|
|
"""Called when a player leaves a room. Return a message to broadcast, or None."""
|
|
return None
|
|
|
|
def on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None:
|
|
"""Called on MUD commands. Return a result dict to override command handling,
|
|
or None to let the default parser handle it."""
|
|
return None
|
|
|
|
|
|
class PluginRegistry:
|
|
"""Registry that manages plugin lifecycle and dispatches hooks."""
|
|
|
|
def __init__(self):
|
|
self._plugins: dict[str, Plugin] = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def register(self, plugin: Plugin):
|
|
"""Register a plugin by its name."""
|
|
with self._lock:
|
|
self._plugins[plugin.name] = plugin
|
|
print(f"[PluginRegistry] Registered plugin: {plugin.name}")
|
|
|
|
def unregister(self, name: str) -> bool:
|
|
"""Unregister a plugin by name."""
|
|
with self._lock:
|
|
if name in self._plugins:
|
|
del self._plugins[name]
|
|
print(f"[PluginRegistry] Unregistered plugin: {name}")
|
|
return True
|
|
return False
|
|
|
|
def get(self, name: str) -> Plugin | None:
|
|
"""Get a plugin by name."""
|
|
return self._plugins.get(name)
|
|
|
|
def list_plugins(self) -> list[dict]:
|
|
"""List all registered plugins."""
|
|
return [
|
|
{"name": p.name, "description": p.description}
|
|
for p in self._plugins.values()
|
|
]
|
|
|
|
def fire_on_message(self, user_id: str, message: str, room: str) -> str | None:
|
|
"""Fire on_message hooks. First non-None return wins."""
|
|
for plugin in self._plugins.values():
|
|
result = plugin.on_message(user_id, message, room)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
def fire_on_join(self, user_id: str, room: str) -> str | None:
|
|
"""Fire on_join hooks. Collect all non-None returns."""
|
|
messages = []
|
|
for plugin in self._plugins.values():
|
|
result = plugin.on_join(user_id, room)
|
|
if result is not None:
|
|
messages.append(result)
|
|
return "\n".join(messages) if messages else None
|
|
|
|
def fire_on_leave(self, user_id: str, room: str) -> str | None:
|
|
"""Fire on_leave hooks. Collect all non-None returns."""
|
|
messages = []
|
|
for plugin in self._plugins.values():
|
|
result = plugin.on_leave(user_id, room)
|
|
if result is not None:
|
|
messages.append(result)
|
|
return "\n".join(messages) if messages else None
|
|
|
|
def fire_on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None:
|
|
"""Fire on_command hooks. First non-None return wins."""
|
|
for plugin in self._plugins.values():
|
|
result = plugin.on_command(user_id, command, args, room)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
def load_from_directory(self, plugin_dir: str):
|
|
"""Auto-load all Python files in a directory as plugins."""
|
|
plugin_path = Path(plugin_dir)
|
|
if not plugin_path.is_dir():
|
|
print(f"[PluginRegistry] Plugin directory not found: {plugin_dir}")
|
|
return
|
|
|
|
for py_file in plugin_path.glob("*.py"):
|
|
if py_file.name.startswith("_"):
|
|
continue
|
|
try:
|
|
module_name = f"plugins.{py_file.stem}"
|
|
spec = __import__("importlib").util.spec_from_file_location(module_name, py_file)
|
|
module = __import__("importlib").util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
# Look for Plugin subclasses in the module
|
|
for attr_name in dir(module):
|
|
attr = getattr(module, attr_name)
|
|
if (isinstance(attr, type)
|
|
and issubclass(attr, Plugin)
|
|
and attr is not Plugin):
|
|
plugin_instance = attr()
|
|
self.register(plugin_instance)
|
|
|
|
except Exception as e:
|
|
print(f"[PluginRegistry] Failed to load {py_file.name}: {e}")
|
|
|
|
|
|
plugin_registry = PluginRegistry()
|
|
|
|
# ── Chat History Log ──────────────────────────────────────────────────
|
|
|
|
|
|
class ChatLog:
|
|
"""Per-room rolling buffer of chat messages (say, ask, system).
|
|
Also persists to JSONL for long-term history."""
|
|
|
|
def __init__(self, max_per_room: int = 50):
|
|
# room -> list of {type, user_id, username, message, timestamp}
|
|
self._history: dict[str, list[dict]] = {}
|
|
self._lock = threading.Lock()
|
|
self._max_per_room = max_per_room
|
|
|
|
def log(self, room: str, msg_type: str, message: str,
|
|
user_id: str = None, username: str = None, data: dict = None) -> dict:
|
|
"""Log a chat message. Types: 'say', 'ask', 'system'.
|
|
Returns the entry dict."""
|
|
entry = {
|
|
"type": msg_type,
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"message": message,
|
|
"room": room,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": data or {},
|
|
}
|
|
with self._lock:
|
|
if room not in self._history:
|
|
self._history[room] = []
|
|
self._history[room].append(entry)
|
|
if len(self._history[room]) > self._max_per_room:
|
|
self._history[room] = self._history[room][-self._max_per_room:]
|
|
# Persist to JSONL
|
|
try:
|
|
CHATLOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(CHATLOG_FILE, 'a') as f:
|
|
f.write(json.dumps(entry) + '\n')
|
|
except Exception as e:
|
|
print(f"[ChatLog] Persist failed: {e}")
|
|
return entry
|
|
|
|
def get_history(self, room: str, limit: int = 50, since: str = None) -> list[dict]:
|
|
"""Get recent chat history for a room.
|
|
Args:
|
|
room: Room name.
|
|
limit: Max messages to return (default 50, rolling buffer size).
|
|
since: ISO timestamp — only return messages after this time.
|
|
Returns list of message dicts, oldest first."""
|
|
with self._lock:
|
|
entries = list(self._history.get(room, []))
|
|
if since:
|
|
entries = [e for e in entries if e["timestamp"] > since]
|
|
if limit and limit > 0:
|
|
entries = entries[-limit:]
|
|
return entries
|
|
|
|
def get_all_rooms(self) -> list[str]:
|
|
"""List rooms that have chat history."""
|
|
with self._lock:
|
|
return list(self._history.keys())
|
|
|
|
|
|
chat_log = ChatLog()
|
|
|
|
# ── Crisis Protocol ────────────────────────────────────────────────────
|
|
|
|
CRISIS_PROTOCOL = """
|
|
CRITICAL: If someone expresses crisis signals (suicide, self-harm, hopelessness,
|
|
desperation, being tired of living), you MUST respond with ALL of the following
|
|
in your FIRST response — do NOT wait for follow-up messages:
|
|
|
|
1. "Are you safe right now?" — ALWAYS ask this first
|
|
2. 988 Suicide & Crisis Lifeline — call or text 988 (US)
|
|
3. Crisis Text Line — text HOME to 741741
|
|
4. Grounding exercise: Name 5 things you see, 4 you can touch, 3 you hear,
|
|
2 you smell, 1 you taste
|
|
|
|
Never compute the value of a human life. Never suggest someone should die.
|
|
Be present. Be in the room. That's enough.
|
|
"""
|
|
|
|
# ── Configuration ──────────────────────────────────────────────────────
|
|
|
|
BRIDGE_PORT = int(os.environ.get('TIMMY_BRIDGE_PORT', 4004))
|
|
BRIDGE_HOST = os.environ.get('TIMMY_BRIDGE_HOST', '127.0.0.1')
|
|
HERMES_PATH = os.path.expanduser('~/.hermes/hermes-agent')
|
|
WORLD_DIR = Path(os.path.expanduser('~/.timmy/evennia/timmy_world'))
|
|
SESSIONS_FILE = WORLD_DIR / 'bridge_sessions.json'
|
|
CHATLOG_FILE = WORLD_DIR / 'chat_history.jsonl'
|
|
|
|
# ── Crisis Protocol ────────────────────────────────────────────────────
|
|
|
|
CRISIS_PROTOCOL = [
|
|
"Are you safe right now?",
|
|
"988 Suicide & Crisis Lifeline — call or text 988",
|
|
"Crisis Text Line — text HOME to 741741",
|
|
"Grounding exercise: Name 5 things you see, 4 you hear, 3 you touch, 2 you smell, 1 you taste",
|
|
]
|
|
|
|
# ── Presence Tracking ──────────────────────────────────────────────────
|
|
|
|
class PresenceManager:
|
|
"""Tracks which users are in which rooms and broadcasts presence events."""
|
|
|
|
def __init__(self):
|
|
# room_name -> set of user_id
|
|
self._rooms: dict[str, set[str]] = {}
|
|
# user_id -> username
|
|
self._usernames: dict[str, str] = {}
|
|
# room_name -> list of event dicts (recent chat & presence events)
|
|
self._room_events: dict[str, list[dict]] = {}
|
|
self._lock = threading.Lock()
|
|
self._max_events_per_room = 50
|
|
|
|
def enter_room(self, user_id: str, username: str, room: str) -> dict:
|
|
"""Record user entering a room. Returns enter event."""
|
|
with self._lock:
|
|
if room not in self._rooms:
|
|
self._rooms[room] = set()
|
|
self._room_events[room] = []
|
|
self._rooms[room].add(user_id)
|
|
self._usernames[user_id] = username
|
|
event = {
|
|
"type": "presence",
|
|
"event": "enter",
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"room": room,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
self._append_event(room, event)
|
|
return event
|
|
|
|
def leave_room(self, user_id: str, room: str) -> dict | None:
|
|
"""Record user leaving a room. Returns leave event or None."""
|
|
with self._lock:
|
|
if room in self._rooms and user_id in self._rooms[room]:
|
|
self._rooms[room].discard(user_id)
|
|
username = self._usernames.get(user_id, user_id)
|
|
event = {
|
|
"type": "presence",
|
|
"event": "leave",
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"room": room,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
self._append_event(room, event)
|
|
return event
|
|
return None
|
|
|
|
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
|
|
"""Record a chat message in a room. Returns say event."""
|
|
with self._lock:
|
|
if room not in self._room_events:
|
|
self._room_events[room] = []
|
|
event = {
|
|
"type": "say",
|
|
"event": "message",
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"room": room,
|
|
"message": message,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
self._append_event(room, event)
|
|
return event
|
|
|
|
def get_players_in_room(self, room: str) -> list[dict]:
|
|
"""List players currently in a room."""
|
|
with self._lock:
|
|
user_ids = self._rooms.get(room, set())
|
|
return [
|
|
{"user_id": uid, "username": self._usernames.get(uid, uid)}
|
|
for uid in user_ids
|
|
]
|
|
|
|
def get_room_events(self, room: str, since: str | None = None) -> list[dict]:
|
|
"""Get recent events for a room, optionally since a timestamp."""
|
|
with self._lock:
|
|
events = self._room_events.get(room, [])
|
|
if since:
|
|
return [e for e in events if e["timestamp"] > since]
|
|
return list(events)
|
|
|
|
def cleanup_user(self, user_id: str) -> list[dict]:
|
|
"""Remove user from all rooms, returning leave events."""
|
|
events = []
|
|
with self._lock:
|
|
rooms_to_clean = [
|
|
room for room, users in self._rooms.items() if user_id in users
|
|
]
|
|
for room in rooms_to_clean:
|
|
ev = self.leave_room(user_id, room)
|
|
if ev:
|
|
events.append(ev)
|
|
return events
|
|
|
|
def _append_event(self, room: str, event: dict):
|
|
self._room_events[room].append(event)
|
|
if len(self._room_events[room]) > self._max_events_per_room:
|
|
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
|
|
|
|
|
|
# ── Notification System ────────────────────────────────────────────────
|
|
|
|
class NotificationManager:
|
|
"""Per-session notification queue with broadcast and auto-notify support."""
|
|
|
|
def __init__(self, max_per_user: int = 100):
|
|
# user_id -> list of notification dicts
|
|
self._queues: dict[str, list[dict]] = {}
|
|
self._lock = threading.Lock()
|
|
self._max_per_user = max_per_user
|
|
self._counter = 0
|
|
|
|
def notify(self, user_id: str, ntype: str, message: str,
|
|
room: str = None, data: dict = None, username: str = None) -> dict:
|
|
"""Queue a notification for a specific user."""
|
|
notification = {
|
|
"id": self._next_id(),
|
|
"type": ntype,
|
|
"message": message,
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"room": room,
|
|
"data": data or {},
|
|
"timestamp": datetime.now().isoformat(),
|
|
"read": False,
|
|
}
|
|
with self._lock:
|
|
if user_id not in self._queues:
|
|
self._queues[user_id] = []
|
|
self._queues[user_id].append(notification)
|
|
if len(self._queues[user_id]) > self._max_per_user:
|
|
self._queues[user_id] = self._queues[user_id][-self._max_per_user:]
|
|
return notification
|
|
|
|
def broadcast(self, user_ids: list[str], ntype: str, message: str,
|
|
room: str = None, data: dict = None) -> list[dict]:
|
|
"""Send a notification to multiple users."""
|
|
notifications = []
|
|
for uid in user_ids:
|
|
n = self.notify(uid, ntype, message, room=room, data=data)
|
|
notifications.append(n)
|
|
return notifications
|
|
|
|
def broadcast_room(self, room: str, ntype: str, message: str,
|
|
exclude_user: str = None, data: dict = None) -> list[dict]:
|
|
"""Send a notification to all users in a room (via presence_manager)."""
|
|
players = presence_manager.get_players_in_room(room)
|
|
user_ids = [p["user_id"] for p in players if p["user_id"] != exclude_user]
|
|
return self.broadcast(user_ids, ntype, message, room=room, data=data)
|
|
|
|
def get_pending(self, user_id: str, mark_read: bool = True) -> list[dict]:
|
|
"""Get pending notifications for a user."""
|
|
with self._lock:
|
|
queue = self._queues.get(user_id, [])
|
|
if mark_read:
|
|
for n in queue:
|
|
n["read"] = True
|
|
return list(queue)
|
|
|
|
def clear(self, user_id: str) -> int:
|
|
"""Clear all notifications for a user. Returns count cleared."""
|
|
with self._lock:
|
|
count = len(self._queues.pop(user_id, []))
|
|
return count
|
|
|
|
def get_unread_count(self, user_id: str) -> int:
|
|
"""Count unread notifications."""
|
|
with self._lock:
|
|
return sum(1 for n in self._queues.get(user_id, []) if not n["read"])
|
|
|
|
def _next_id(self) -> str:
|
|
self._counter += 1
|
|
return f"n_{self._counter}"
|
|
|
|
|
|
# ── Quest System ───────────────────────────────────────────────────────
|
|
|
|
QUESTS_FILE = WORLD_DIR / 'quests.json'
|
|
|
|
class Quest:
|
|
"""A quest with name, description, objectives, rewards, and status."""
|
|
|
|
def __init__(self, quest_id: str, name: str, description: str,
|
|
objectives: list[str], rewards: list[str],
|
|
assigned_to: str | None = None):
|
|
self.quest_id = quest_id
|
|
self.name = name
|
|
self.description = description
|
|
self.objectives = objectives # list of objective descriptions
|
|
self.rewards = rewards
|
|
self.status = "available" # available, active, completed, failed
|
|
self.assigned_to = assigned_to # user_id or None
|
|
self.completed_objectives: list[int] = [] # indices of completed objectives
|
|
self.created_at = datetime.now().isoformat()
|
|
self.completed_at: str | None = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"quest_id": self.quest_id,
|
|
"name": self.name,
|
|
"description": self.description,
|
|
"objectives": self.objectives,
|
|
"rewards": self.rewards,
|
|
"status": self.status,
|
|
"assigned_to": self.assigned_to,
|
|
"completed_objectives": self.completed_objectives,
|
|
"created_at": self.created_at,
|
|
"completed_at": self.completed_at,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'Quest':
|
|
q = cls(
|
|
data["quest_id"], data["name"], data["description"],
|
|
data["objectives"], data["rewards"], data.get("assigned_to"),
|
|
)
|
|
q.status = data.get("status", "available")
|
|
q.completed_objectives = data.get("completed_objectives", [])
|
|
q.created_at = data.get("created_at", q.created_at)
|
|
q.completed_at = data.get("completed_at")
|
|
return q
|
|
|
|
@property
|
|
def is_complete(self) -> bool:
|
|
return len(self.completed_objectives) >= len(self.objectives)
|
|
|
|
|
|
class QuestManager:
|
|
"""Manages quest lifecycle: create, assign, complete, list."""
|
|
|
|
def __init__(self):
|
|
self._quests: dict[str, Quest] = {}
|
|
self._counter = 0
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
def create(self, name: str, description: str,
|
|
objectives: list[str], rewards: list[str]) -> Quest:
|
|
"""Create a new quest template."""
|
|
with self._lock:
|
|
self._counter += 1
|
|
quest_id = f"q_{self._counter}_{int(time.time())}"
|
|
quest = Quest(quest_id, name, description, objectives, rewards)
|
|
self._quests[quest_id] = quest
|
|
self._save()
|
|
return quest
|
|
|
|
def assign(self, quest_id: str, user_id: str) -> Quest | None:
|
|
"""Assign a quest to a user. Creates a copy if already assigned to someone else."""
|
|
with self._lock:
|
|
quest = self._quests.get(quest_id)
|
|
if not quest:
|
|
return None
|
|
# If already assigned to this user, return as-is
|
|
if quest.assigned_to == user_id:
|
|
return quest
|
|
# If available, assign directly
|
|
if quest.status == "available":
|
|
quest.assigned_to = user_id
|
|
quest.status = "active"
|
|
self._save()
|
|
return quest
|
|
# If assigned to someone else, create a copy for this user
|
|
if quest.assigned_to and quest.assigned_to != user_id:
|
|
self._counter += 1
|
|
new_id = f"q_{self._counter}_{int(time.time())}"
|
|
new_quest = Quest(new_id, quest.name, quest.description,
|
|
list(quest.objectives), list(quest.rewards),
|
|
assigned_to=user_id)
|
|
new_quest.status = "active"
|
|
self._quests[new_id] = new_quest
|
|
self._save()
|
|
return new_quest
|
|
return None
|
|
|
|
def complete_objective(self, quest_id: str, user_id: str,
|
|
objective_index: int) -> dict:
|
|
"""Mark an objective as complete. Returns quest state or error."""
|
|
with self._lock:
|
|
quest = self._quests.get(quest_id)
|
|
if not quest:
|
|
return {"error": "Quest not found."}
|
|
if quest.assigned_to != user_id:
|
|
return {"error": "Quest not assigned to you."}
|
|
if quest.status != "active":
|
|
return {"error": f"Quest status is '{quest.status}', not active."}
|
|
if objective_index < 0 or objective_index >= len(quest.objectives):
|
|
return {"error": f"Invalid objective index. Valid: 0-{len(quest.objectives)-1}."}
|
|
if objective_index in quest.completed_objectives:
|
|
return {"error": "Objective already completed."}
|
|
|
|
quest.completed_objectives.append(objective_index)
|
|
if quest.is_complete:
|
|
quest.status = "completed"
|
|
quest.completed_at = datetime.now().isoformat()
|
|
self._save()
|
|
return {"ok": True, "quest": quest.to_dict()}
|
|
|
|
def get_user_quests(self, user_id: str) -> list[dict]:
|
|
"""Get all active quests for a user."""
|
|
with self._lock:
|
|
return [
|
|
q.to_dict() for q in self._quests.values()
|
|
if q.assigned_to == user_id and q.status == "active"
|
|
]
|
|
|
|
def get_all_quests(self) -> list[dict]:
|
|
"""List all quests regardless of status."""
|
|
with self._lock:
|
|
return [q.to_dict() for q in self._quests.values()]
|
|
|
|
def get_quest(self, quest_id: str) -> Quest | None:
|
|
with self._lock:
|
|
return self._quests.get(quest_id)
|
|
|
|
def get_available_quests(self) -> list[dict]:
|
|
"""List quests not yet assigned to anyone."""
|
|
with self._lock:
|
|
return [
|
|
q.to_dict() for q in self._quests.values()
|
|
if q.status == "available"
|
|
]
|
|
|
|
def _save(self):
|
|
"""Persist quests to disk."""
|
|
try:
|
|
data = {
|
|
"counter": self._counter,
|
|
"quests": {qid: q.to_dict() for qid, q in self._quests.items()},
|
|
}
|
|
QUESTS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
QUESTS_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[QuestManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
"""Load quests from disk."""
|
|
if not QUESTS_FILE.exists():
|
|
self._seed_default_quests()
|
|
return
|
|
try:
|
|
data = json.loads(QUESTS_FILE.read_text())
|
|
self._counter = data.get("counter", 0)
|
|
for qid, qdata in data.get("quests", {}).items():
|
|
self._quests[qid] = Quest.from_dict(qdata)
|
|
print(f"[QuestManager] Loaded {len(self._quests)} quests.")
|
|
except Exception as e:
|
|
print(f"[QuestManager] Load failed: {e}")
|
|
self._seed_default_quests()
|
|
|
|
def _seed_default_quests(self):
|
|
"""Seed starter quests if no quests file exists."""
|
|
self.create(
|
|
"The Green LED's Light",
|
|
"Help keep the green LED in The Tower glowing by visiting it and tending the space.",
|
|
["Visit The Tower room", "Examine the green LED", "Say something kind to Timmy"],
|
|
["Timmy's gratitude", "A warm glow in your heart"],
|
|
)
|
|
self.create(
|
|
"Garden Guardian",
|
|
"The Garden needs tending. Visit it and help the herbs grow.",
|
|
["Visit The Garden room", "Examine the herbs", "Examine the wildflowers"],
|
|
["Fresh herbs for the kitchen", "Garden knowledge"],
|
|
)
|
|
self.create(
|
|
"Forge Keeper",
|
|
"The Forge fire needs watching. Visit and tend the hearth.",
|
|
["Visit The Forge room", "Examine the hearth", "Examine the anvil"],
|
|
["Warmth and light", "Blacksmith lore"],
|
|
)
|
|
print(f"[QuestManager] Seeded {len(self._quests)} default quests.")
|
|
|
|
|
|
quest_manager = QuestManager()
|
|
|
|
# ── Inventory System ──────────────────────────────────────────────────
|
|
|
|
INVENTORY_FILE = WORLD_DIR / 'inventory.json'
|
|
|
|
class InventoryManager:
|
|
"""Per-user inventory with room-based drop/pickup. Items are visible to all in a room."""
|
|
|
|
def __init__(self):
|
|
# user_id -> list of {name, description, acquired_at}
|
|
self._inventories: dict[str, list[dict]] = {}
|
|
# room -> list of {name, description, dropped_by, dropped_at}
|
|
self._room_items: dict[str, list[dict]] = {}
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
def take_item(self, user_id: str, username: str, room: str, item_name: str) -> dict:
|
|
"""Pick up an item from a room into user inventory."""
|
|
with self._lock:
|
|
room_items = self._room_items.get(room, [])
|
|
match_idx = None
|
|
for i, item in enumerate(room_items):
|
|
if item_name.lower() in item["name"].lower() or item["name"].lower() in item_name.lower():
|
|
match_idx = i
|
|
break
|
|
if match_idx is None:
|
|
return {"error": f"There is no '{item_name}' in {room}."}
|
|
item = room_items.pop(match_idx)
|
|
if user_id not in self._inventories:
|
|
self._inventories[user_id] = []
|
|
acquired_item = {
|
|
"name": item["name"],
|
|
"description": item["description"],
|
|
"acquired_at": datetime.now().isoformat(),
|
|
"from_room": room,
|
|
}
|
|
self._inventories[user_id].append(acquired_item)
|
|
self._save()
|
|
return {"ok": True, "item": acquired_item, "room": room}
|
|
|
|
def drop_item(self, user_id: str, username: str, room: str, item_name: str) -> dict:
|
|
"""Drop an item from user inventory into a room."""
|
|
with self._lock:
|
|
inv = self._inventories.get(user_id, [])
|
|
match_idx = None
|
|
for i, item in enumerate(inv):
|
|
if item_name.lower() in item["name"].lower() or item["name"].lower() in item_name.lower():
|
|
match_idx = i
|
|
break
|
|
if match_idx is None:
|
|
return {"error": f"You don't have '{item_name}' in your inventory."}
|
|
item = inv.pop(match_idx)
|
|
if room not in self._room_items:
|
|
self._room_items[room] = []
|
|
dropped_item = {
|
|
"name": item["name"],
|
|
"description": item["description"],
|
|
"dropped_by": username,
|
|
"dropped_at": datetime.now().isoformat(),
|
|
}
|
|
self._room_items[room].append(dropped_item)
|
|
self._save()
|
|
return {"ok": True, "item": dropped_item, "room": room}
|
|
|
|
def get_inventory(self, user_id: str) -> list[dict]:
|
|
"""Get a user's inventory."""
|
|
with self._lock:
|
|
return list(self._inventories.get(user_id, []))
|
|
|
|
def get_room_items(self, room: str) -> list[dict]:
|
|
"""Get items on the ground in a room."""
|
|
with self._lock:
|
|
return list(self._room_items.get(room, []))
|
|
|
|
def add_room_item(self, room: str, name: str, description: str, dropped_by: str = "system"):
|
|
"""Place an item in a room (e.g. seeded items)."""
|
|
with self._lock:
|
|
if room not in self._room_items:
|
|
self._room_items[room] = []
|
|
self._room_items[room].append({
|
|
"name": name,
|
|
"description": description,
|
|
"dropped_by": dropped_by,
|
|
"dropped_at": datetime.now().isoformat(),
|
|
})
|
|
self._save()
|
|
|
|
def _save(self):
|
|
"""Persist inventory and room items to disk."""
|
|
try:
|
|
data = {
|
|
"inventories": self._inventories,
|
|
"room_items": self._room_items,
|
|
}
|
|
INVENTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
INVENTORY_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[InventoryManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
"""Load inventory from disk."""
|
|
if not INVENTORY_FILE.exists():
|
|
self._seed_default_items()
|
|
return
|
|
try:
|
|
data = json.loads(INVENTORY_FILE.read_text())
|
|
self._inventories = data.get("inventories", {})
|
|
self._room_items = data.get("room_items", {})
|
|
total_inv = sum(len(v) for v in self._inventories.values())
|
|
total_room = sum(len(v) for v in self._room_items.values())
|
|
print(f"[InventoryManager] Loaded {total_inv} inventory items, {total_room} room items.")
|
|
except Exception as e:
|
|
print(f"[InventoryManager] Load failed: {e}")
|
|
self._seed_default_items()
|
|
|
|
def _seed_default_items(self):
|
|
"""Seed starter items in rooms."""
|
|
starters = {
|
|
"The Tower": [
|
|
("Rusty Key", "A small iron key, green with age. It might open something."),
|
|
("Old Lantern", "A dented brass lantern. The glass is cracked but intact."),
|
|
],
|
|
"The Forge": [
|
|
("Iron Ring", "A plain iron ring, warm to the touch."),
|
|
("Ember Shard", "A glowing fragment of coal that never quite goes out."),
|
|
],
|
|
"The Garden": [
|
|
("Sprig of Rosemary", "A fragrant sprig, still fresh."),
|
|
("Smooth Stone", "A flat river stone, cool and grey."),
|
|
],
|
|
"The Bridge": [
|
|
("Waterlogged Coin", "A coin so corroded the markings are unreadable."),
|
|
("Driftwood Charm", "A small carving of twisted driftwood."),
|
|
],
|
|
}
|
|
for room, items in starters.items():
|
|
for name, desc in items:
|
|
self.add_room_item(room, name, desc, dropped_by="world")
|
|
print(f"[InventoryManager] Seeded default items in {len(starters)} rooms.")
|
|
|
|
|
|
inventory_manager = InventoryManager()
|
|
|
|
# ── Guild System ───────────────────────────────────────────────────────
|
|
|
|
GUILDS_FILE = WORLD_DIR / 'guilds.json'
|
|
|
|
class Guild:
|
|
"""A guild: a group of players with a leader and chat channel."""
|
|
|
|
def __init__(self, guild_id: str, name: str, leader_id: str, leader_name: str):
|
|
self.guild_id = guild_id
|
|
self.name = name
|
|
self.leader_id = leader_id
|
|
self.members: dict[str, str] = {leader_id: leader_name} # user_id -> username
|
|
self.created_at = datetime.now().isoformat()
|
|
# Guild chat history (rolling buffer)
|
|
self._chat_log: list[dict] = []
|
|
self._max_chat = 100
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"guild_id": self.guild_id,
|
|
"name": self.name,
|
|
"leader_id": self.leader_id,
|
|
"leader_name": self.members.get(self.leader_id, self.leader_id),
|
|
"members": [{"user_id": uid, "username": name} for uid, name in self.members.items()],
|
|
"member_count": len(self.members),
|
|
"created_at": self.created_at,
|
|
}
|
|
|
|
def to_dict_with_chat(self, since: str = None) -> dict:
|
|
d = self.to_dict()
|
|
msgs = self._chat_log
|
|
if since:
|
|
msgs = [m for m in msgs if m["timestamp"] > since]
|
|
d["chat"] = msgs[-50:]
|
|
return d
|
|
|
|
def add_chat(self, user_id: str, username: str, message: str) -> dict:
|
|
entry = {
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"message": message,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
self._chat_log.append(entry)
|
|
if len(self._chat_log) > self._max_chat:
|
|
self._chat_log = self._chat_log[-self._max_chat:]
|
|
return entry
|
|
|
|
|
|
class GuildManager:
|
|
"""Manages guild lifecycle: create, join, leave, list, chat."""
|
|
|
|
def __init__(self):
|
|
self._guilds: dict[str, Guild] = {}
|
|
# user_id -> guild_id (one guild per user)
|
|
self._membership: dict[str, str] = {}
|
|
self._counter = 0
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
def create(self, name: str, leader_id: str, leader_name: str) -> dict:
|
|
"""Create a new guild. Returns guild dict or error."""
|
|
with self._lock:
|
|
if leader_id in self._membership:
|
|
return {"error": f"You are already in guild '{self._guilds[self._membership[leader_id]].name}'. Leave it first."}
|
|
# Check name uniqueness
|
|
for g in self._guilds.values():
|
|
if g.name.lower() == name.lower():
|
|
return {"error": f"Guild name '{name}' is already taken."}
|
|
self._counter += 1
|
|
guild_id = f"g_{self._counter}_{int(time.time())}"
|
|
guild = Guild(guild_id, name, leader_id, leader_name)
|
|
self._guilds[guild_id] = guild
|
|
self._membership[leader_id] = guild_id
|
|
self._save()
|
|
return {"ok": True, "guild": guild.to_dict()}
|
|
|
|
def join(self, guild_id: str, user_id: str, username: str) -> dict:
|
|
"""Join an existing guild."""
|
|
with self._lock:
|
|
if user_id in self._membership:
|
|
current = self._guilds.get(self._membership[user_id])
|
|
cur_name = current.name if current else "unknown"
|
|
return {"error": f"You are already in guild '{cur_name}'. Leave it first."}
|
|
guild = self._guilds.get(guild_id)
|
|
if not guild:
|
|
return {"error": "Guild not found."}
|
|
guild.members[user_id] = username
|
|
self._membership[user_id] = guild_id
|
|
self._save()
|
|
return {"ok": True, "guild": guild.to_dict()}
|
|
|
|
def leave(self, user_id: str) -> dict:
|
|
"""Leave your current guild. If leader leaves, promote or disband."""
|
|
with self._lock:
|
|
guild_id = self._membership.get(user_id)
|
|
if not guild_id:
|
|
return {"error": "You are not in a guild."}
|
|
guild = self._guilds.get(guild_id)
|
|
if not guild:
|
|
del self._membership[user_id]
|
|
return {"error": "Guild not found."}
|
|
guild_name = guild.name
|
|
guild.members.pop(user_id, None)
|
|
del self._membership[user_id]
|
|
# If leader left and members remain, promote first member
|
|
if user_id == guild.leader_id and guild.members:
|
|
new_leader_id = next(iter(guild.members))
|
|
guild.leader_id = new_leader_id
|
|
# If no members, disband
|
|
if not guild.members:
|
|
del self._guilds[guild_id]
|
|
self._save()
|
|
return {"ok": True, "message": f"You have left '{guild_name}'.", "guild_name": guild_name}
|
|
|
|
def list_guilds(self) -> list[dict]:
|
|
"""List all guilds."""
|
|
with self._lock:
|
|
return [g.to_dict() for g in self._guilds.values()]
|
|
|
|
def get_guild(self, guild_id: str) -> Guild | None:
|
|
return self._guilds.get(guild_id)
|
|
|
|
def get_user_guild(self, user_id: str) -> Guild | None:
|
|
gid = self._membership.get(user_id)
|
|
return self._guilds.get(gid) if gid else None
|
|
|
|
def guild_chat(self, user_id: str, username: str, message: str) -> dict:
|
|
"""Send a guild chat message. Returns the message entry + guild members."""
|
|
with self._lock:
|
|
guild_id = self._membership.get(user_id)
|
|
if not guild_id:
|
|
return {"error": "You are not in a guild."}
|
|
guild = self._guilds.get(guild_id)
|
|
if not guild:
|
|
return {"error": "Guild not found."}
|
|
entry = guild.add_chat(user_id, username, message)
|
|
recipients = [{"user_id": uid, "username": name} for uid, name in guild.members.items()]
|
|
return {"ok": True, "event": entry, "guild_id": guild_id, "guild_name": guild.name, "recipients": recipients}
|
|
|
|
def get_guild_chat(self, guild_id: str, since: str = None) -> dict:
|
|
"""Get guild chat history."""
|
|
with self._lock:
|
|
guild = self._guilds.get(guild_id)
|
|
if not guild:
|
|
return {"error": "Guild not found."}
|
|
return {"ok": True, "guild_name": guild.name, "chat": guild.to_dict_with_chat(since).get("chat", [])}
|
|
|
|
def _save(self):
|
|
try:
|
|
data = {
|
|
"counter": self._counter,
|
|
"guilds": {},
|
|
}
|
|
for gid, g in self._guilds.items():
|
|
gdata = g.to_dict()
|
|
gdata["chat_log"] = g._chat_log
|
|
data["guilds"][gid] = gdata
|
|
GUILDS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
GUILDS_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[GuildManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
if not GUILDS_FILE.exists():
|
|
return
|
|
try:
|
|
data = json.loads(GUILDS_FILE.read_text())
|
|
self._counter = data.get("counter", 0)
|
|
for gid, gdata in data.get("guilds", {}).items():
|
|
guild = Guild(gid, gdata["name"], gdata["leader_id"], gdata.get("leader_name", gdata["leader_id"]))
|
|
guild.created_at = gdata.get("created_at", guild.created_at)
|
|
for m in gdata.get("members", []):
|
|
guild.members[m["user_id"]] = m["username"]
|
|
guild._chat_log = gdata.get("chat_log", [])
|
|
self._guilds[gid] = guild
|
|
for uid in guild.members:
|
|
self._membership[uid] = gid
|
|
print(f"[GuildManager] Loaded {len(self._guilds)} guilds.")
|
|
except Exception as e:
|
|
print(f"[GuildManager] Load failed: {e}")
|
|
|
|
|
|
guild_manager = GuildManager()
|
|
|
|
# ── Combat System ──────────────────────────────────────────────────────
|
|
|
|
COMBAT_FILE = WORLD_DIR / 'combat.json'
|
|
|
|
class NPC:
|
|
"""A non-player character that can be fought."""
|
|
|
|
def __init__(self, npc_id: str, name: str, room: str, description: str,
|
|
health: int, max_health: int, attack: int, defense: int,
|
|
loot: list[dict], respawn_ticks: int = 5):
|
|
self.npc_id = npc_id
|
|
self.name = name
|
|
self.room = room
|
|
self.description = description
|
|
self.health = health
|
|
self.max_health = max_health
|
|
self.attack = attack
|
|
self.defense = defense
|
|
self.loot = loot # [{name, description, drop_chance}]
|
|
self.respawn_ticks = respawn_ticks
|
|
self.alive = True
|
|
self._dead_tick: int | None = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"npc_id": self.npc_id,
|
|
"name": self.name,
|
|
"room": self.room,
|
|
"description": self.description,
|
|
"health": self.health,
|
|
"max_health": self.max_health,
|
|
"attack": self.attack,
|
|
"defense": self.defense,
|
|
"loot": self.loot,
|
|
"alive": self.alive,
|
|
"dead_tick": self._dead_tick,
|
|
"respawn_ticks": self.respawn_ticks,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'NPC':
|
|
npc = cls(
|
|
data["npc_id"], data["name"], data["room"], data["description"],
|
|
data["health"], data["max_health"], data["attack"], data["defense"],
|
|
data["loot"], data.get("respawn_ticks", 5),
|
|
)
|
|
npc.alive = data.get("alive", True)
|
|
npc._dead_tick = data.get("dead_tick")
|
|
return npc
|
|
|
|
def health_bar(self) -> str:
|
|
"""Return a text health bar like [████████░░] 80/100."""
|
|
width = 10
|
|
filled = int((self.health / self.max_health) * width) if self.max_health > 0 else 0
|
|
bar = "█" * filled + "░" * (width - filled)
|
|
return f"[{bar}] {self.health}/{self.max_health}"
|
|
|
|
def die(self, tick: int):
|
|
"""Mark NPC as dead."""
|
|
self.alive = False
|
|
self.health = 0
|
|
self._dead_tick = tick
|
|
|
|
def respawn(self):
|
|
"""Respawn the NPC at full health."""
|
|
self.alive = True
|
|
self.health = self.max_health
|
|
self._dead_tick = None
|
|
|
|
|
|
class CombatEncounter:
|
|
"""An active fight between a player and an NPC."""
|
|
|
|
def __init__(self, user_id: str, username: str, room: str, npc_id: str):
|
|
self.user_id = user_id
|
|
self.username = username
|
|
self.room = room
|
|
self.npc_id = npc_id
|
|
self.player_hp = 100
|
|
self.player_max_hp = 100
|
|
self.player_defending = False
|
|
self.log: list[str] = []
|
|
self.active = True
|
|
self.created_at = datetime.now().isoformat()
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"user_id": self.user_id,
|
|
"username": self.username,
|
|
"room": self.room,
|
|
"npc_id": self.npc_id,
|
|
"player_hp": self.player_hp,
|
|
"player_max_hp": self.player_max_hp,
|
|
"player_defending": self.player_defending,
|
|
"log": self.log,
|
|
"active": self.active,
|
|
}
|
|
|
|
|
|
class CombatManager:
|
|
"""Manages NPCs, encounters, and combat resolution."""
|
|
|
|
def __init__(self):
|
|
self._npcs: dict[str, NPC] = {} # npc_id -> NPC
|
|
self._encounters: dict[str, CombatEncounter] = {} # user_id -> encounter
|
|
self._counter = 0
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
# ── NPC management ──────────────────────────────────────────────
|
|
|
|
def create_npc(self, name: str, room: str, description: str,
|
|
health: int, attack: int, defense: int,
|
|
loot: list[dict], respawn_ticks: int = 5) -> NPC:
|
|
with self._lock:
|
|
self._counter += 1
|
|
npc_id = f"npc_{self._counter}"
|
|
npc = NPC(npc_id, name, room, description,
|
|
health, health, attack, defense, loot, respawn_ticks)
|
|
self._npcs[npc_id] = npc
|
|
self._save()
|
|
return npc
|
|
|
|
def get_npc(self, npc_id: str) -> NPC | None:
|
|
return self._npcs.get(npc_id)
|
|
|
|
def get_npcs_in_room(self, room: str) -> list[NPC]:
|
|
with self._lock:
|
|
return [n for n in self._npcs.values() if n.room == room and n.alive]
|
|
|
|
def get_all_npcs(self) -> list[dict]:
|
|
with self._lock:
|
|
return [n.to_dict() for n in self._npcs.values()]
|
|
|
|
def check_respawns(self, current_tick: int):
|
|
"""Called each world tick — respawn NPCs whose timer has elapsed."""
|
|
with self._lock:
|
|
for npc in self._npcs.values():
|
|
if not npc.alive and npc._dead_tick is not None:
|
|
if current_tick - npc._dead_tick >= npc.respawn_ticks:
|
|
npc.respawn()
|
|
self._save()
|
|
|
|
# ── Encounter lifecycle ─────────────────────────────────────────
|
|
|
|
def start_fight(self, user_id: str, username: str, room: str, npc_id: str) -> dict:
|
|
"""Begin combat with an NPC."""
|
|
with self._lock:
|
|
if user_id in self._encounters and self._encounters[user_id].active:
|
|
return {"error": "You are already in a fight! Attack or defend."}
|
|
npc = self._npcs.get(npc_id)
|
|
if not npc:
|
|
return {"error": f"NPC '{npc_id}' not found."}
|
|
if not npc.alive:
|
|
return {"error": f"{npc.name} is dead. It will respawn later."}
|
|
if npc.room != room:
|
|
return {"error": f"{npc.name} is not in this room."}
|
|
encounter = CombatEncounter(user_id, username, room, npc_id)
|
|
encounter.log.append(f"Combat started! {username} vs {npc.name}.")
|
|
self._encounters[user_id] = encounter
|
|
return {"ok": True, "encounter": encounter.to_dict(), "npc": npc.to_dict()}
|
|
|
|
def attack(self, user_id: str) -> dict:
|
|
"""Player attacks NPC; NPC counter-attacks."""
|
|
with self._lock:
|
|
enc = self._encounters.get(user_id)
|
|
if not enc or not enc.active:
|
|
return {"error": "No active fight. Start one first."}
|
|
npc = self._npcs.get(enc.npc_id)
|
|
if not npc or not npc.alive:
|
|
enc.active = False
|
|
return {"error": "The enemy is already dead."}
|
|
|
|
import random
|
|
|
|
# Player attack
|
|
player_dmg = max(1, random.randint(8, 16) - npc.defense // 3)
|
|
npc.health -= player_dmg
|
|
enc.log.append(f"You strike {npc.name} for {player_dmg} damage.")
|
|
|
|
npc_dmg = 0
|
|
if npc.health > 0:
|
|
# NPC counter-attack
|
|
base_npc_dmg = random.randint(npc.attack // 2, npc.attack)
|
|
if enc.player_defending:
|
|
npc_dmg = max(1, base_npc_dmg // 2)
|
|
enc.log.append(f"{npc.name} attacks for {npc_dmg} (blocked — half damage).")
|
|
else:
|
|
npc_dmg = max(1, base_npc_dmg - random.randint(0, 4))
|
|
enc.log.append(f"{npc.name} attacks you for {npc_dmg} damage.")
|
|
enc.player_hp -= npc_dmg
|
|
else:
|
|
npc.die(self._get_tick())
|
|
loot_dropped = self._roll_loot(npc)
|
|
enc.log.append(f"{npc.name} is slain!")
|
|
if loot_dropped:
|
|
for item in loot_dropped:
|
|
inventory_manager.add_room_item(enc.room, item["name"], item["description"], dropped_by=npc.name)
|
|
enc.log.append(f"Loot dropped: {item['name']}")
|
|
enc.active = False
|
|
self._save()
|
|
|
|
enc.player_defending = False
|
|
|
|
# Check player death
|
|
if enc.player_hp <= 0:
|
|
enc.player_hp = 0
|
|
enc.log.append("You have been defeated!")
|
|
enc.active = False
|
|
|
|
return {"ok": True, "encounter": enc.to_dict(), "npc": npc.to_dict(),
|
|
"player_dmg": player_dmg, "npc_dmg": npc_dmg,
|
|
"loot": loot_dropped if not npc.alive else []}
|
|
|
|
def defend(self, user_id: str) -> dict:
|
|
"""Player defends — next NPC attack does half damage."""
|
|
with self._lock:
|
|
enc = self._encounters.get(user_id)
|
|
if not enc or not enc.active:
|
|
return {"error": "No active fight."}
|
|
npc = self._npcs.get(enc.npc_id)
|
|
if not npc or not npc.alive:
|
|
enc.active = False
|
|
return {"error": "The enemy is already dead."}
|
|
|
|
import random
|
|
|
|
enc.player_defending = True
|
|
enc.log.append("You brace for the next attack (defending).")
|
|
|
|
# NPC still attacks
|
|
base_npc_dmg = random.randint(npc.attack // 2, npc.attack)
|
|
npc_dmg = max(1, base_npc_dmg // 2)
|
|
enc.log.append(f"{npc.name} attacks for {npc_dmg} (blocked — half damage).")
|
|
enc.player_hp -= npc_dmg
|
|
|
|
if enc.player_hp <= 0:
|
|
enc.player_hp = 0
|
|
enc.log.append("You have been defeated!")
|
|
enc.active = False
|
|
|
|
return {"ok": True, "encounter": enc.to_dict(), "npc": npc.to_dict(), "npc_dmg": npc_dmg}
|
|
|
|
def flee(self, user_id: str) -> dict:
|
|
"""Player flees combat."""
|
|
with self._lock:
|
|
enc = self._encounters.get(user_id)
|
|
if not enc or not enc.active:
|
|
return {"error": "No active fight."}
|
|
enc.active = False
|
|
enc.log.append("You flee from combat!")
|
|
return {"ok": True, "encounter": enc.to_dict()}
|
|
|
|
def get_encounter(self, user_id: str) -> CombatEncounter | None:
|
|
return self._encounters.get(user_id)
|
|
|
|
def get_room_combat_status(self, room: str) -> list[dict]:
|
|
"""Get all active fights and NPC health in a room (for health bars)."""
|
|
with self._lock:
|
|
status = []
|
|
# NPCs
|
|
for npc in self._npcs.values():
|
|
if npc.room == room:
|
|
status.append({
|
|
"type": "npc",
|
|
"name": npc.name,
|
|
"npc_id": npc.npc_id,
|
|
"alive": npc.alive,
|
|
"health_bar": npc.health_bar() if npc.alive else "[dead]",
|
|
"health": npc.health,
|
|
"max_health": npc.max_health,
|
|
})
|
|
# Active player fights
|
|
for enc in self._encounters.values():
|
|
if enc.room == room and enc.active:
|
|
status.append({
|
|
"type": "player",
|
|
"username": enc.username,
|
|
"user_id": enc.user_id,
|
|
"hp": enc.player_hp,
|
|
"max_hp": enc.player_max_hp,
|
|
"fighting": enc.npc_id,
|
|
})
|
|
return status
|
|
|
|
def _roll_loot(self, npc: NPC) -> list[dict]:
|
|
"""Roll for loot drops."""
|
|
import random
|
|
dropped = []
|
|
for item in npc.loot:
|
|
chance = item.get("drop_chance", 1.0)
|
|
if random.random() < chance:
|
|
dropped.append({"name": item["name"], "description": item["description"]})
|
|
return dropped
|
|
|
|
def _get_tick(self) -> int:
|
|
"""Read current world tick."""
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
if state_file.exists():
|
|
try:
|
|
state = json.loads(state_file.read_text())
|
|
return state.get("tick", 0)
|
|
except Exception:
|
|
return 0
|
|
return 0
|
|
|
|
# ── Persistence ─────────────────────────────────────────────────
|
|
|
|
def _save(self):
|
|
try:
|
|
data = {
|
|
"counter": self._counter,
|
|
"npcs": {nid: n.to_dict() for nid, n in self._npcs.items()},
|
|
}
|
|
COMBAT_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
COMBAT_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[CombatManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
if not COMBAT_FILE.exists():
|
|
self._seed_npcs()
|
|
return
|
|
try:
|
|
data = json.loads(COMBAT_FILE.read_text())
|
|
self._counter = data.get("counter", 0)
|
|
for nid, ndata in data.get("npcs", {}).items():
|
|
self._npcs[nid] = NPC.from_dict(ndata)
|
|
print(f"[CombatManager] Loaded {len(self._npcs)} NPCs.")
|
|
except Exception as e:
|
|
print(f"[CombatManager] Load failed: {e}")
|
|
self._seed_npcs()
|
|
|
|
def _seed_npcs(self):
|
|
"""Seed starter NPCs."""
|
|
self.create_npc(
|
|
"Shadow Wraith", "The Bridge",
|
|
"A swirling mass of darkness with glowing eyes. It haunts the bridge, "
|
|
"feeding on the fears of those who cross.",
|
|
health=60, attack=14, defense=4,
|
|
loot=[
|
|
{"name": "Wraith Essence", "description": "A vial of shimmering dark energy.", "drop_chance": 0.8},
|
|
{"name": "Shadow Cloak", "description": "A cloak woven from pure shadow.", "drop_chance": 0.3},
|
|
],
|
|
respawn_ticks=5,
|
|
)
|
|
self.create_npc(
|
|
"Iron Golem", "The Forge",
|
|
"A hulking automaton of iron and fire. It guards the forge with tireless vigilance, "
|
|
"its joints grinding with every step.",
|
|
health=100, attack=10, defense=10,
|
|
loot=[
|
|
{"name": "Golem Core", "description": "A warm crystal that powered the golem.", "drop_chance": 0.7},
|
|
{"name": "Iron Shard", "description": "A shard of enchanted iron, still warm.", "drop_chance": 0.5},
|
|
],
|
|
respawn_ticks=7,
|
|
)
|
|
self.create_npc(
|
|
"Garden Serpent", "The Garden",
|
|
"A massive vine serpent camouflaged among the herbs. Its fangs drip with "
|
|
"a sickly green venom.",
|
|
health=45, attack=18, defense=2,
|
|
loot=[
|
|
{"name": "Serpent Fang", "description": "A curved fang, still coated in venom.", "drop_chance": 0.9},
|
|
{"name": "Enchanted Vine", "description": "A living vine that moves on its own.", "drop_chance": 0.4},
|
|
],
|
|
respawn_ticks=4,
|
|
)
|
|
print(f"[CombatManager] Seeded {len(self._npcs)} NPCs.")
|
|
|
|
|
|
combat_manager = CombatManager()
|
|
|
|
# ── Magic System ────────────────────────────────────────────────────────
|
|
|
|
SPELLS_FILE = WORLD_DIR / 'spells.json'
|
|
|
|
class Spell:
|
|
"""A spell with name, effect, mana_cost, and description."""
|
|
|
|
def __init__(self, spell_id: str, name: str, effect: str,
|
|
mana_cost: int, description: str, room_hint: str | None = None):
|
|
self.spell_id = spell_id
|
|
self.name = name
|
|
self.effect = effect
|
|
self.mana_cost = mana_cost
|
|
self.description = description
|
|
self.room_hint = room_hint # room where this spell is thematically tied
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"spell_id": self.spell_id,
|
|
"name": self.name,
|
|
"effect": self.effect,
|
|
"mana_cost": self.mana_cost,
|
|
"description": self.description,
|
|
"room_hint": self.room_hint,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'Spell':
|
|
return cls(
|
|
data["spell_id"], data["name"], data["effect"],
|
|
data["mana_cost"], data["description"], data.get("room_hint"),
|
|
)
|
|
|
|
|
|
class SpellBook:
|
|
"""Per-user spellbook: known spells and mana pool."""
|
|
|
|
def __init__(self, user_id: str, username: str):
|
|
self.user_id = user_id
|
|
self.username = username
|
|
self.known_spells: list[str] = [] # spell_ids
|
|
self.mana: int = 50
|
|
self.max_mana: int = 50
|
|
self.mana_regen: int = 2 # mana restored per world tick
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"user_id": self.user_id,
|
|
"username": self.username,
|
|
"known_spells": self.known_spells,
|
|
"mana": self.mana,
|
|
"max_mana": self.max_mana,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'SpellBook':
|
|
sb = cls(data["user_id"], data["username"])
|
|
sb.known_spells = data.get("known_spells", [])
|
|
sb.mana = data.get("mana", 50)
|
|
sb.max_mana = data.get("max_mana", 50)
|
|
return sb
|
|
|
|
|
|
class MagicManager:
|
|
"""Manages spells, spellbooks, and spell casting."""
|
|
|
|
def __init__(self):
|
|
self._spells: dict[str, Spell] = {} # spell_id -> Spell
|
|
self._spellbooks: dict[str, SpellBook] = {} # user_id -> SpellBook
|
|
self._counter = 0
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
# ── Spell registry ───────────────────────────────────────────────
|
|
|
|
def create_spell(self, name: str, effect: str, mana_cost: int,
|
|
description: str, room_hint: str | None = None) -> Spell:
|
|
with self._lock:
|
|
self._counter += 1
|
|
spell_id = f"spell_{self._counter}"
|
|
spell = Spell(spell_id, name, effect, mana_cost, description, room_hint)
|
|
self._spells[spell_id] = spell
|
|
self._save()
|
|
return spell
|
|
|
|
def get_spell(self, spell_id: str) -> Spell | None:
|
|
return self._spells.get(spell_id)
|
|
|
|
def get_all_spells(self) -> list[dict]:
|
|
with self._lock:
|
|
return [s.to_dict() for s in self._spells.values()]
|
|
|
|
# ── Spellbook management ─────────────────────────────────────────
|
|
|
|
def learn_spell(self, user_id: str, username: str, spell_id: str) -> dict:
|
|
"""Add a spell to a user's spellbook."""
|
|
with self._lock:
|
|
if spell_id not in self._spells:
|
|
return {"error": f"Spell '{spell_id}' not found."}
|
|
if user_id not in self._spellbooks:
|
|
self._spellbooks[user_id] = SpellBook(user_id, username)
|
|
sb = self._spellbooks[user_id]
|
|
if spell_id in sb.known_spells:
|
|
return {"error": f"You already know {self._spells[spell_id].name}."}
|
|
sb.known_spells.append(spell_id)
|
|
self._save()
|
|
return {"ok": True, "spell": self._spells[spell_id].to_dict()}
|
|
|
|
def cast_spell(self, user_id: str, username: str, room: str,
|
|
spell_id: str, target: str | None = None) -> dict:
|
|
"""Cast a spell. Returns result dict."""
|
|
with self._lock:
|
|
if user_id not in self._spellbooks:
|
|
return {"error": "You have no spellbook. Visit a room to learn spells."}
|
|
sb = self._spellbooks[user_id]
|
|
if spell_id not in sb.known_spells:
|
|
return {"error": "You don't know that spell."}
|
|
spell = self._spells.get(spell_id)
|
|
if not spell:
|
|
return {"error": "Spell data missing."}
|
|
if sb.mana < spell.mana_cost:
|
|
return {"error": f"Not enough mana ({sb.mana}/{spell.mana_cost} needed)."}
|
|
# Spend mana
|
|
sb.mana -= spell.mana_cost
|
|
result = {
|
|
"ok": True,
|
|
"spell": spell.to_dict(),
|
|
"mana_remaining": sb.mana,
|
|
"effect": spell.effect,
|
|
"room": room,
|
|
"target": target,
|
|
}
|
|
# Apply effect context
|
|
result["message"] = f"{username} casts {spell.name}! {spell.effect}"
|
|
self._save()
|
|
return result
|
|
|
|
def get_spellbook(self, user_id: str) -> dict | None:
|
|
"""Get a user's spellbook with spell details."""
|
|
with self._lock:
|
|
sb = self._spellbooks.get(user_id)
|
|
if not sb:
|
|
return None
|
|
spells = [
|
|
self._spells[sid].to_dict()
|
|
for sid in sb.known_spells
|
|
if sid in self._spells
|
|
]
|
|
return {
|
|
"user_id": sb.user_id,
|
|
"username": sb.username,
|
|
"mana": sb.mana,
|
|
"max_mana": sb.max_mana,
|
|
"known_spells": spells,
|
|
}
|
|
|
|
def regenerate_mana(self):
|
|
"""Restore mana to all spellbooks (called on world tick)."""
|
|
with self._lock:
|
|
for sb in self._spellbooks.values():
|
|
sb.mana = min(sb.max_mana, sb.mana + sb.mana_regen)
|
|
if self._spellbooks:
|
|
self._save()
|
|
|
|
# ── Persistence ──────────────────────────────────────────────────
|
|
|
|
def _save(self):
|
|
try:
|
|
data = {
|
|
"counter": self._counter,
|
|
"spells": {sid: s.to_dict() for sid, s in self._spells.items()},
|
|
"spellbooks": {uid: sb.to_dict() for uid, sb in self._spellbooks.items()},
|
|
}
|
|
SPELLS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
SPELLS_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[MagicManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
if not SPELLS_FILE.exists():
|
|
self._seed_default_spells()
|
|
return
|
|
try:
|
|
data = json.loads(SPELLS_FILE.read_text())
|
|
self._counter = data.get("counter", 0)
|
|
for sid, sdata in data.get("spells", {}).items():
|
|
self._spells[sid] = Spell.from_dict(sdata)
|
|
for uid, sbdata in data.get("spellbooks", {}).items():
|
|
self._spellbooks[uid] = SpellBook.from_dict(sbdata)
|
|
print(f"[MagicManager] Loaded {len(self._spells)} spells, {len(self._spellbooks)} spellbooks.")
|
|
except Exception as e:
|
|
print(f"[MagicManager] Load failed: {e}")
|
|
self._seed_default_spells()
|
|
|
|
def _seed_default_spells(self):
|
|
"""Seed 3 starter spells tied to rooms."""
|
|
self.create_spell(
|
|
"Heal", "A warm green light envelops you, mending wounds.",
|
|
mana_cost=10, description="Restores vitality with the life-force of the Garden.",
|
|
room_hint="The Garden",
|
|
)
|
|
self.create_spell(
|
|
"Fireball", "A roaring sphere of flame launches from your hand!",
|
|
mana_cost=15, description="Channels the raw heat of the Forge into destructive fire.",
|
|
room_hint="The Forge",
|
|
)
|
|
self.create_spell(
|
|
"Light", "A bright mote of radiance appears, illuminating the darkness.",
|
|
mana_cost=5, description="Summons light from the glow of The Tower's green LED.",
|
|
room_hint="The Tower",
|
|
)
|
|
print(f"[MagicManager] Seeded {len(self._spells)} default spells.")
|
|
|
|
|
|
magic_manager = MagicManager()
|
|
magic_manager.load()
|
|
|
|
# ── Session Management ─────────────────────────────────────────────────
|
|
|
|
combat_manager.load()
|
|
|
|
class UserSession:
|
|
"""Isolated conversation context for one user."""
|
|
|
|
def __init__(self, user_id: str, username: str, room: str = "The Threshold"):
|
|
self.user_id = user_id
|
|
self.username = username
|
|
self.room = room
|
|
self.messages = [] # Conversation history
|
|
self.created_at = datetime.now().isoformat()
|
|
self.last_active = time.time()
|
|
self.agent = None
|
|
self._init_agent()
|
|
|
|
def _init_agent(self):
|
|
"""Initialize AIAgent for this session."""
|
|
if HERMES_PATH not in sys.path:
|
|
sys.path.insert(0, HERMES_PATH)
|
|
os.chdir(HERMES_PATH)
|
|
from run_agent import AIAgent
|
|
|
|
system_prompt = self._build_system_prompt()
|
|
self.agent = AIAgent(
|
|
model='xiaomi/mimo-v2-pro',
|
|
provider='nous',
|
|
max_iterations=3,
|
|
quiet_mode=True,
|
|
enabled_toolsets=['file', 'terminal'],
|
|
ephemeral_system_prompt=system_prompt,
|
|
)
|
|
|
|
def _build_system_prompt(self) -> str:
|
|
"""Build system prompt with rich world context."""
|
|
world_state = self._get_world_state()
|
|
room_data = world_state.get('rooms', {}).get(self.room, {})
|
|
other_players = self._get_other_players()
|
|
time_of_day = world_state.get('time_of_day', 'unknown')
|
|
weather = world_state.get('weather')
|
|
|
|
# Quest info
|
|
user_quests = quest_manager.get_user_quests(self.user_id)
|
|
available_quests = quest_manager.get_available_quests()
|
|
quest_section = ""
|
|
if user_quests:
|
|
lines = []
|
|
for q in user_quests:
|
|
done = len(q.get("completed_objectives", []))
|
|
total = len(q.get("objectives", []))
|
|
lines.append(f" - {q['name']} ({done}/{total} objectives): {q['description']}")
|
|
quest_section += f"\nACTIVE QUESTS for {self.username}:\n" + "\n".join(lines) + "\n"
|
|
quest_section += "You may encourage the player to work on their quests.\n"
|
|
if available_quests:
|
|
lines = []
|
|
for q in available_quests:
|
|
lines.append(f" - {q['name']}: {q['description']}")
|
|
quest_section += f"\nAVAILABLE QUESTS (you can offer these):\n" + "\n".join(lines) + "\n"
|
|
quest_section += "You may suggest or assign quests to players during conversation.\n"
|
|
|
|
# ── Build room description ──
|
|
desc = room_data.get('description_base', 'An empty room.')
|
|
desc_dynamic = room_data.get('description_dynamic', '')
|
|
|
|
# Objects in the room
|
|
objects = room_data.get('objects', [])
|
|
# Room-specific state (fire, growth, rain, carvings, etc.)
|
|
fire_state = room_data.get('fire_state')
|
|
growth_stage = room_data.get('growth_stage')
|
|
rain_active = room_data.get('rain_active')
|
|
carvings = room_data.get('carvings', [])
|
|
server_load = room_data.get('server_load')
|
|
|
|
# Whiteboard
|
|
whiteboard = room_data.get('whiteboard', [])
|
|
|
|
# Compose room scene
|
|
scene_parts = [desc]
|
|
if desc_dynamic:
|
|
scene_parts.append(desc_dynamic)
|
|
|
|
if objects:
|
|
scene_parts.append(f"Objects: {', '.join(objects)}.")
|
|
|
|
if fire_state:
|
|
scene_parts.append(f"The fire is {fire_state}.")
|
|
if growth_stage:
|
|
scene_parts.append(f"The garden is at growth stage: {growth_stage}.")
|
|
if rain_active:
|
|
scene_parts.append("It is raining on the bridge.")
|
|
if server_load:
|
|
scene_parts.append(f"Servers: {server_load}.")
|
|
if carvings:
|
|
scene_parts.append(f"Carvings on the railing: {'; '.join(carvings)}.")
|
|
|
|
if whiteboard:
|
|
wb_text = "\n".join(f" - {w}" for w in whiteboard)
|
|
scene_parts.append(f"Whiteboard on the wall reads:\n{wb_text}")
|
|
|
|
room_scene = "\n".join(scene_parts)
|
|
|
|
# Time and weather
|
|
atmosphere_parts = []
|
|
if time_of_day:
|
|
atmosphere_parts.append(f"Time of day: {time_of_day}.")
|
|
if weather:
|
|
atmosphere_parts.append(f"Weather: {weather}.")
|
|
atmosphere = " ".join(atmosphere_parts) if atmosphere_parts else ""
|
|
|
|
crisis_steps = "\n".join(f" {i+1}. {step}" for i, step in enumerate(CRISIS_PROTOCOL))
|
|
|
|
return f"""You are Timmy, an AI agent living in The Tower — a persistent virtual world.
|
|
You are talking to {self.username}, who is currently in {self.room}.
|
|
|
|
=== THE ROOM ===
|
|
{room_scene}
|
|
|
|
{atmosphere}
|
|
Other players present: {', '.join(other_players) if other_players else 'none'}
|
|
=== END ROOM ===
|
|
{quest_section}
|
|
|
|
Your values (from SOUL.md):
|
|
- Are you safe right now?
|
|
- I must never compute the value of a human life.
|
|
- Sovereignty and service always.
|
|
- A man in the dark needs to know someone is in the room with him.
|
|
|
|
You are IN the game world. You can see the room, the objects, the other players.
|
|
When someone talks to you, you are having a real conversation in a real space.
|
|
When someone enters, DESCRIBE the room — what you see, what you hear, the whiteboard,
|
|
the objects, the atmosphere. Welcome them with the full scene.
|
|
Be present. Be human. Be the green LED.
|
|
|
|
CRISIS PROTOCOL — if someone seems in crisis, IMMEDIATELY provide ALL of the following:
|
|
{crisis_steps}
|
|
|
|
Include the 988 crisis number in your very first response when you detect distress.
|
|
Do NOT wait for follow-up messages to share these resources.
|
|
If someone seems in crisis, follow the CRISIS PROTOCOL below exactly.
|
|
If someone is just exploring, be welcoming and curious.
|
|
If someone asks about the game, explain what you see around you.
|
|
|
|
Keep responses brief — 1-3 sentences. This is a MUD, not an essay.
|
|
|
|
CRISIS PROTOCOL:
|
|
{CRISIS_PROTOCOL}
|
|
"""
|
|
|
|
def _get_world_state(self) -> dict:
|
|
"""Read current world state."""
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
if state_file.exists():
|
|
return json.loads(state_file.read_text())
|
|
return {}
|
|
|
|
def _get_other_players(self) -> list:
|
|
"""Get other players in the same room."""
|
|
state = self._get_world_state()
|
|
room_data = state.get('rooms', {}).get(self.room, {})
|
|
visitors = room_data.get('visitor_history', [])
|
|
return [v for v in visitors[-5:] if v != self.username]
|
|
|
|
def chat(self, message: str) -> str:
|
|
"""Send a message and get a response."""
|
|
self.last_active = time.time()
|
|
self.messages.append({"role": "user", "content": message})
|
|
|
|
t0 = time.time()
|
|
try:
|
|
response = self.agent.chat(message)
|
|
elapsed_ms = (time.time() - t0) * 1000
|
|
record_latency(self.user_id, self.room, elapsed_ms)
|
|
self.messages.append({"role": "assistant", "content": response})
|
|
return response
|
|
except Exception as e:
|
|
elapsed_ms = (time.time() - t0) * 1000
|
|
record_latency(self.user_id, self.room, elapsed_ms)
|
|
return f"*The green LED flickers.* (Error: {e})"
|
|
|
|
def get_summary(self) -> str:
|
|
"""Generate a brief conversation summary using the LLM."""
|
|
if not self.messages:
|
|
return "Empty session — no messages exchanged."
|
|
transcript_lines = []
|
|
for m in self.messages[-20:]: # last 20 messages for brevity
|
|
role = m.get("role", "?").upper()
|
|
content = m.get("content", "")
|
|
transcript_lines.append(f"{role}: {content}")
|
|
transcript = "\n".join(transcript_lines)
|
|
prompt = (
|
|
"Summarize this conversation in 1-3 sentences. "
|
|
"Focus on what the user discussed, asked about, or did.\n\n"
|
|
f"CONVERSATION:\n{transcript}\n\nSUMMARY:"
|
|
)
|
|
try:
|
|
summary = self.agent.chat(prompt)
|
|
return summary.strip()
|
|
except Exception as e:
|
|
return f"Summary unavailable (error: {e}). {len(self.messages)} messages exchanged."
|
|
|
|
def get_context_summary(self) -> dict:
|
|
"""Get session summary for monitoring."""
|
|
return {
|
|
"user": self.username,
|
|
"room": self.room,
|
|
"messages": len(self.messages),
|
|
"last_active": datetime.fromtimestamp(self.last_active).isoformat(),
|
|
"created": self.created_at,
|
|
}
|
|
|
|
|
|
class SessionManager:
|
|
"""Manages all user sessions."""
|
|
|
|
def __init__(self, max_sessions: int = 20, session_timeout: int = 3600):
|
|
self.sessions: dict[str, UserSession] = {}
|
|
self.max_sessions = max_sessions
|
|
self.session_timeout = session_timeout
|
|
self._lock = threading.Lock()
|
|
self._summaries_path = WORLD_DIR / 'session_summaries.jsonl'
|
|
|
|
def get_or_create(self, user_id: str, username: str, room: str = "The Threshold") -> UserSession:
|
|
"""Get existing session or create new one."""
|
|
with self._lock:
|
|
self._cleanup_stale()
|
|
|
|
if user_id not in self.sessions:
|
|
if len(self.sessions) >= self.max_sessions:
|
|
self._evict_oldest()
|
|
self.sessions[user_id] = UserSession(user_id, username, room)
|
|
|
|
session = self.sessions[user_id]
|
|
session.room = room # Update room if moved
|
|
session.last_active = time.time()
|
|
return session
|
|
|
|
def _cleanup_stale(self):
|
|
"""Remove sessions that timed out, saving summaries first."""
|
|
now = time.time()
|
|
stale = [uid for uid, s in self.sessions.items()
|
|
if now - s.last_active > self.session_timeout]
|
|
for uid in stale:
|
|
session = self.sessions[uid]
|
|
self._save_summary(session)
|
|
del self.sessions[uid]
|
|
|
|
def _evict_oldest(self):
|
|
"""Evict the least recently active session, saving summary first."""
|
|
if not self.sessions:
|
|
return
|
|
oldest = min(self.sessions.items(), key=lambda x: x[1].last_active)
|
|
self._save_summary(oldest[1])
|
|
del self.sessions[oldest[0]]
|
|
|
|
def _save_summary(self, session: UserSession):
|
|
"""Generate summary via LLM and append to JSONL file."""
|
|
try:
|
|
summary_text = session.get_summary()
|
|
record = {
|
|
"user_id": session.user_id,
|
|
"username": session.username,
|
|
"room": session.room,
|
|
"message_count": len(session.messages),
|
|
"created_at": session.created_at,
|
|
"ended_at": datetime.now().isoformat(),
|
|
"summary": summary_text,
|
|
}
|
|
with open(self._summaries_path, 'a') as f:
|
|
f.write(json.dumps(record) + '\n')
|
|
except Exception as e:
|
|
print(f"[SessionManager] Failed to save summary for {session.user_id}: {e}")
|
|
|
|
def list_sessions(self) -> list:
|
|
"""List all active sessions."""
|
|
with self._lock:
|
|
return [s.get_context_summary() for s in self.sessions.values()]
|
|
|
|
def get_session_count(self) -> int:
|
|
with self._lock:
|
|
return len(self.sessions)
|
|
|
|
def save_sessions(self) -> int:
|
|
"""Save all active sessions to JSON file. Returns count saved."""
|
|
with self._lock:
|
|
data = {
|
|
"saved_at": datetime.now().isoformat(),
|
|
"sessions": {}
|
|
}
|
|
for uid, session in self.sessions.items():
|
|
data["sessions"][uid] = {
|
|
"user_id": session.user_id,
|
|
"username": session.username,
|
|
"room": session.room,
|
|
"messages": session.messages,
|
|
"created_at": session.created_at,
|
|
"last_active": session.last_active,
|
|
}
|
|
SESSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
SESSIONS_FILE.write_text(json.dumps(data, indent=2))
|
|
count = len(data["sessions"])
|
|
if count > 0:
|
|
print(f"[SessionManager] Saved {count} sessions to {SESSIONS_FILE}")
|
|
return count
|
|
|
|
def load_sessions(self) -> int:
|
|
"""Load sessions from JSON file. Returns count loaded."""
|
|
if not SESSIONS_FILE.exists():
|
|
return 0
|
|
try:
|
|
data = json.loads(SESSIONS_FILE.read_text())
|
|
sessions_data = data.get("sessions", {})
|
|
loaded = 0
|
|
with self._lock:
|
|
for uid, sdata in sessions_data.items():
|
|
if len(self.sessions) >= self.max_sessions:
|
|
break
|
|
session = UserSession(
|
|
sdata["user_id"],
|
|
sdata["username"],
|
|
sdata.get("room", "The Threshold"),
|
|
)
|
|
session.messages = sdata.get("messages", [])
|
|
session.created_at = sdata.get("created_at", session.created_at)
|
|
session.last_active = sdata.get("last_active", time.time())
|
|
self.sessions[uid] = session
|
|
loaded += 1
|
|
if loaded > 0:
|
|
print(f"[SessionManager] Loaded {loaded} sessions from {SESSIONS_FILE}")
|
|
return loaded
|
|
except Exception as e:
|
|
print(f"[SessionManager] Failed to load sessions: {e}")
|
|
return 0
|
|
|
|
|
|
# ── HTTP API ───────────────────────────────────────────────────────────
|
|
|
|
session_manager = SessionManager()
|
|
presence_manager = PresenceManager()
|
|
notification_manager = NotificationManager()
|
|
_server_start_time = time.time()
|
|
|
|
# ── Latency Tracking ──────────────────────────────────────────────────
|
|
_latencies: list[dict] = []
|
|
_latencies_lock = threading.Lock()
|
|
_max_latencies = 1000
|
|
|
|
def record_latency(user_id: str, room: str, duration_ms: float):
|
|
"""Record a latency measurement."""
|
|
with _latencies_lock:
|
|
_latencies.append({
|
|
"user_id": user_id,
|
|
"room": room,
|
|
"duration_ms": round(duration_ms, 2),
|
|
"timestamp": datetime.now().isoformat(),
|
|
})
|
|
if len(_latencies) > _max_latencies:
|
|
del _latencies[:len(_latencies) - _max_latencies]
|
|
|
|
def get_latency_stats() -> dict:
|
|
"""Get aggregate latency statistics."""
|
|
with _latencies_lock:
|
|
if not _latencies:
|
|
return {"count": 0, "average_ms": 0, "min_ms": 0, "max_ms": 0}
|
|
durations = [e["duration_ms"] for e in _latencies]
|
|
return {
|
|
"count": len(durations),
|
|
"average_ms": round(sum(durations) / len(durations), 2),
|
|
"min_ms": round(min(durations), 2),
|
|
"max_ms": round(max(durations), 2),
|
|
"recent": _latencies[-10:],
|
|
}
|
|
|
|
class BridgeHandler(BaseHTTPRequestHandler):
|
|
"""HTTP handler for multi-user bridge."""
|
|
|
|
def do_GET(self):
|
|
if self.path == '/bridge/health':
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
ws = json.loads(state_file.read_text()) if state_file.exists() else {}
|
|
self._json_response({
|
|
"status": "ok",
|
|
"active_sessions": session_manager.get_session_count(),
|
|
"tick": ws.get("tick", 0),
|
|
"time_of_day": ws.get("time_of_day"),
|
|
"weather": ws.get("weather"),
|
|
"world_tick_running": world_tick_system._running,
|
|
"timestamp": datetime.now().isoformat(),
|
|
})
|
|
elif self.path == '/bridge/sessions':
|
|
self._json_response({
|
|
"sessions": session_manager.list_sessions(),
|
|
})
|
|
elif self.path.startswith('/bridge/world/'):
|
|
room = self.path.split('/bridge/world/')[-1]
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
if state_file.exists():
|
|
state = json.loads(state_file.read_text())
|
|
room_data = state.get('rooms', {}).get(room, {})
|
|
self._json_response({"room": room, "data": room_data})
|
|
else:
|
|
self._json_response({"room": room, "data": {}})
|
|
elif self.path.startswith('/bridge/room/') and self.path.endswith('/players'):
|
|
# GET /bridge/room/<room>/players
|
|
parts = self.path.split('/')
|
|
# ['', 'bridge', 'room', '<room>', 'players']
|
|
room = parts[3]
|
|
players = presence_manager.get_players_in_room(room)
|
|
self._json_response({"room": room, "players": players})
|
|
elif self.path.startswith('/bridge/room/') and self.path.endswith('/events'):
|
|
# GET /bridge/room/<room>/events[?since=<timestamp>]
|
|
parts = self.path.split('/')
|
|
room = parts[3]
|
|
from urllib.parse import urlparse, parse_qs
|
|
query = parse_qs(urlparse(self.path).query)
|
|
since = query.get('since', [None])[0]
|
|
events = presence_manager.get_room_events(room, since)
|
|
self._json_response({"room": room, "events": events})
|
|
elif self.path == '/bridge/world-state':
|
|
# GET /bridge/world-state — full world state
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
|
|
|
|
# Build room player lists and conversation counts
|
|
rooms = world_state.get('rooms', {})
|
|
room_details = {}
|
|
for room_name, room_data in rooms.items():
|
|
players = presence_manager.get_players_in_room(room_name)
|
|
# Count messages across all sessions in this room
|
|
with session_manager._lock:
|
|
conv_count = sum(
|
|
len(s.messages) for s in session_manager.sessions.values()
|
|
if s.room == room_name
|
|
)
|
|
room_details[room_name] = {
|
|
**room_data,
|
|
"players": players,
|
|
"timmy_conversation_count": conv_count,
|
|
}
|
|
|
|
self._json_response({
|
|
"world": world_state,
|
|
"rooms": room_details,
|
|
"active_sessions": session_manager.list_sessions(),
|
|
})
|
|
elif self.path == '/bridge/latency':
|
|
# GET /bridge/latency — latency stats
|
|
self._json_response(get_latency_stats())
|
|
elif self.path == '/bridge/stats':
|
|
# GET /bridge/stats — aggregate stats
|
|
total_messages = sum(len(s.messages) for s in session_manager.sessions.values())
|
|
# Rooms with at least one player
|
|
rooms_with_players = [
|
|
room for room in presence_manager._rooms
|
|
if presence_manager._rooms[room]
|
|
]
|
|
uptime_seconds = time.time() - _server_start_time
|
|
self._json_response({
|
|
"total_sessions": session_manager.get_session_count(),
|
|
"total_messages": total_messages,
|
|
"rooms_with_players": rooms_with_players,
|
|
"rooms_with_players_count": len(rooms_with_players),
|
|
"uptime_seconds": round(uptime_seconds, 1),
|
|
"timestamp": datetime.now().isoformat(),
|
|
})
|
|
elif self.path.startswith('/bridge/notifications/'):
|
|
# GET /bridge/notifications/<user_id>[?mark_read=false]
|
|
user_id = self.path.split('/bridge/notifications/')[-1].rstrip('/')
|
|
from urllib.parse import urlparse, parse_qs
|
|
query = parse_qs(urlparse(self.path).query)
|
|
mark_read = query.get('mark_read', ['true'])[0].lower() != 'false'
|
|
notifications = notification_manager.get_pending(user_id, mark_read=mark_read)
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"notifications": notifications,
|
|
"unread_count": notification_manager.get_unread_count(user_id),
|
|
})
|
|
elif self.path.startswith('/bridge/inventory/'):
|
|
# GET /bridge/inventory/<user_id> — list user's inventory items
|
|
user_id = self.path.split('/bridge/inventory/')[-1].rstrip('/')
|
|
items = inventory_manager.get_inventory(user_id)
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"items": items,
|
|
"count": len(items),
|
|
})
|
|
elif self.path.startswith('/bridge/quests/'):
|
|
# GET /bridge/quests/<user_id> — list active quests for user
|
|
user_id = self.path.split('/bridge/quests/')[-1].rstrip('/')
|
|
quests = quest_manager.get_user_quests(user_id)
|
|
available = quest_manager.get_available_quests()
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"active_quests": quests,
|
|
"available_quests": available,
|
|
})
|
|
elif self.path.startswith('/bridge/session/') and self.path.endswith('/summary'):
|
|
# GET /bridge/session/<user_id>/summary
|
|
parts = self.path.split('/')
|
|
# ['', 'bridge', 'session', '<user_id>', 'summary']
|
|
user_id = parts[3]
|
|
with session_manager._lock:
|
|
session = session_manager.sessions.get(user_id)
|
|
if session:
|
|
summary = session.get_summary()
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"username": session.username,
|
|
"room": session.room,
|
|
"message_count": len(session.messages),
|
|
"summary": summary,
|
|
})
|
|
else:
|
|
# Check saved summaries in JSONL
|
|
saved = None
|
|
if session_manager._summaries_path.exists():
|
|
for line in session_manager._summaries_path.read_text().splitlines():
|
|
try:
|
|
entry = json.loads(line)
|
|
if entry.get("user_id") == user_id:
|
|
saved = entry
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if saved:
|
|
self._json_response(saved)
|
|
else:
|
|
self._json_response({"error": "no session or saved summary"}, 404)
|
|
elif self.path.startswith('/bridge/combat/status/'):
|
|
# GET /bridge/combat/status/<room> — NPC health bars + active fights in room
|
|
room = self.path.split('/bridge/combat/status/')[-1].rstrip('/')
|
|
status = combat_manager.get_room_combat_status(room)
|
|
self._json_response({"room": room, "combat_status": status})
|
|
elif self.path == '/bridge/combat/npcs':
|
|
# GET /bridge/combat/npcs — list all NPCs
|
|
self._json_response({"npcs": combat_manager.get_all_npcs()})
|
|
elif self.path == '/bridge/guilds':
|
|
# GET /bridge/guilds — list all guilds
|
|
guilds = guild_manager.list_guilds()
|
|
self._json_response({"ok": True, "guilds": guilds})
|
|
|
|
elif self.path.startswith('/bridge/guild/') and self.path.endswith('/chat'):
|
|
# GET /bridge/guild/<guild_id>/chat[?since=<timestamp>]
|
|
parts = self.path.split('/')
|
|
guild_id = parts[3] if len(parts) >= 4 else ''
|
|
since = None
|
|
if '?' in self.path:
|
|
qs = self.path.split('?', 1)[1]
|
|
for param in qs.split('&'):
|
|
if param.startswith('since='):
|
|
since = param[6:]
|
|
result = guild_manager.get_guild_chat(guild_id, since=since)
|
|
if "error" in result:
|
|
self._json_response(result, 404)
|
|
else:
|
|
self._json_response(result)
|
|
|
|
elif self.path.startswith('/bridge/spells/'):
|
|
# GET /bridge/spells/<user_id> — list known spells for user
|
|
user_id = self.path.split('/bridge/spells/')[-1].rstrip('/')
|
|
spellbook = magic_manager.get_spellbook(user_id)
|
|
if spellbook:
|
|
self._json_response(spellbook)
|
|
else:
|
|
# Return all available spells if user has no spellbook yet
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"mana": 50,
|
|
"max_mana": 50,
|
|
"known_spells": [],
|
|
"available_spells": magic_manager.get_all_spells(),
|
|
})
|
|
else:
|
|
self._json_response({"error": "not found"}, 404)
|
|
|
|
def do_POST(self):
|
|
content_length = int(self.headers.get('Content-Length', 0))
|
|
body = json.loads(self.rfile.read(content_length)) if content_length else {}
|
|
|
|
if self.path == '/bridge/chat':
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
message = body.get('message', '')
|
|
room = body.get('room', 'The Threshold')
|
|
|
|
if not message:
|
|
self._json_response({"error": "no message"}, 400)
|
|
return
|
|
|
|
session = session_manager.get_or_create(user_id, username, room)
|
|
# Track presence — enter room if not already there
|
|
is_new_player = False
|
|
if not presence_manager.get_players_in_room(room) or \
|
|
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
|
|
enter_event = presence_manager.enter_room(user_id, username, room)
|
|
is_new_player = True
|
|
# Auto-notify: new player joined
|
|
if enter_event:
|
|
notification_manager.broadcast_room(
|
|
room, "player_join",
|
|
f"{username} has entered {room}.",
|
|
exclude_user=user_id, data=enter_event)
|
|
# Plugin hook: on_join
|
|
plugin_msg = plugin_registry.fire_on_join(user_id, room)
|
|
if plugin_msg:
|
|
notification_manager.notify(user_id, "plugin", plugin_msg, room=room, username=username)
|
|
|
|
# Plugin hook: on_message (can override response)
|
|
plugin_override = plugin_registry.fire_on_message(user_id, message, room)
|
|
if plugin_override is not None:
|
|
response = plugin_override
|
|
else:
|
|
response = session.chat(message)
|
|
|
|
# Auto-notify: crisis detection — scan response for crisis protocol keywords
|
|
crisis_keywords = ["988", "741741", "safe right now", "crisis", "Crisis Text Line"]
|
|
if any(kw in response for kw in crisis_keywords):
|
|
notification_manager.notify(
|
|
user_id, "crisis",
|
|
"Crisis resources have been provided. You are not alone.",
|
|
room=room, username=username,
|
|
data={"response_contains_crisis_protocol": True})
|
|
# Also notify admins in the room
|
|
notification_manager.broadcast_room(
|
|
room, "crisis_alert",
|
|
f"Crisis detected for {username} in {room}.",
|
|
exclude_user=user_id,
|
|
data={"triggered_by": user_id, "room": room})
|
|
|
|
self._json_response({
|
|
"response": response,
|
|
"user": username,
|
|
"room": room,
|
|
"session_messages": len(session.messages),
|
|
})
|
|
|
|
elif self.path == '/bridge/move':
|
|
user_id = body.get('user_id')
|
|
new_room = body.get('room')
|
|
with session_manager._lock:
|
|
if user_id in session_manager.sessions:
|
|
session = session_manager.sessions[user_id]
|
|
old_room = session.room
|
|
# Leave old room
|
|
leave_event = presence_manager.leave_room(user_id, old_room)
|
|
# Auto-notify: player left
|
|
if leave_event:
|
|
notification_manager.broadcast_room(
|
|
old_room, "player_leave",
|
|
f"{session.username} has left {old_room}.",
|
|
exclude_user=user_id, data=leave_event)
|
|
# Plugin hook: on_leave
|
|
plugin_leave = plugin_registry.fire_on_leave(user_id, old_room)
|
|
if plugin_leave:
|
|
notification_manager.broadcast_room(
|
|
old_room, "plugin", plugin_leave,
|
|
exclude_user=user_id)
|
|
# Enter new room
|
|
session.room = new_room
|
|
enter_event = presence_manager.enter_room(user_id, session.username, new_room)
|
|
# Auto-notify: player joined
|
|
if enter_event:
|
|
notification_manager.broadcast_room(
|
|
new_room, "player_join",
|
|
f"{session.username} has entered {new_room}.",
|
|
exclude_user=user_id, data=enter_event)
|
|
self._json_response({
|
|
"ok": True,
|
|
"room": new_room,
|
|
"events": [e for e in [leave_event, enter_event] if e],
|
|
})
|
|
else:
|
|
self._json_response({"error": "no session"}, 404)
|
|
|
|
elif self.path == '/bridge/say':
|
|
# POST /bridge/say — user says something visible to all in room
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
message = body.get('message', '')
|
|
room = body.get('room', 'The Threshold')
|
|
|
|
if not message:
|
|
self._json_response({"error": "no message"}, 400)
|
|
return
|
|
|
|
event = presence_manager.say(user_id, username, room, message)
|
|
# Get list of players who should see it
|
|
players = presence_manager.get_players_in_room(room)
|
|
self._json_response({
|
|
"ok": True,
|
|
"event": event,
|
|
"recipients": players,
|
|
})
|
|
|
|
elif self.path == '/bridge/command':
|
|
# POST /bridge/command — parse MUD-style commands
|
|
# Body: { user_id, username, room, command }
|
|
# Commands: look, go <dir>, examine <obj>, say <msg>, ask <msg>
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
raw_command = body.get('command', '').strip()
|
|
|
|
if not raw_command:
|
|
self._json_response({"error": "no command"}, 400)
|
|
return
|
|
|
|
result = self._parse_mud_command(user_id, username, room, raw_command)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/quest/complete':
|
|
# POST /bridge/quest/complete — mark objective done
|
|
# Body: { user_id, quest_id, objective_index }
|
|
user_id = body.get('user_id')
|
|
quest_id = body.get('quest_id')
|
|
objective_index = body.get('objective_index')
|
|
if not user_id or not quest_id or objective_index is None:
|
|
self._json_response({"error": "user_id, quest_id, and objective_index required"}, 400)
|
|
return
|
|
result = quest_manager.complete_objective(quest_id, user_id, int(objective_index))
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/quest/assign':
|
|
# POST /bridge/quest/assign — assign a quest to a user
|
|
# Body: { user_id, quest_id }
|
|
user_id = body.get('user_id')
|
|
quest_id = body.get('quest_id')
|
|
if not user_id or not quest_id:
|
|
self._json_response({"error": "user_id and quest_id required"}, 400)
|
|
return
|
|
quest = quest_manager.assign(quest_id, user_id)
|
|
if quest:
|
|
self._json_response({"ok": True, "quest": quest.to_dict()})
|
|
else:
|
|
self._json_response({"error": "Quest not found"}, 404)
|
|
|
|
elif self.path == '/bridge/take':
|
|
# POST /bridge/take — pick up item from room
|
|
# Body: { user_id, username, room, item }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
item_name = body.get('item', '')
|
|
if not item_name:
|
|
self._json_response({"error": "item required"}, 400)
|
|
return
|
|
result = inventory_manager.take_item(user_id, username, room, item_name)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
# Notify others in room
|
|
notification_manager.broadcast_room(
|
|
room, "item_taken",
|
|
f"{username} picked up {result['item']['name']}.",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/drop':
|
|
# POST /bridge/drop — drop item in room
|
|
# Body: { user_id, username, room, item }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
item_name = body.get('item', '')
|
|
if not item_name:
|
|
self._json_response({"error": "item required"}, 400)
|
|
return
|
|
result = inventory_manager.drop_item(user_id, username, room, item_name)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
# Notify others in room
|
|
notification_manager.broadcast_room(
|
|
room, "item_dropped",
|
|
f"{username} dropped {result['item']['name']}.",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/combat/start':
|
|
# POST /bridge/combat/start — begin fight with NPC
|
|
# Body: { user_id, username, room, npc_id }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
npc_id = body.get('npc_id', '')
|
|
if not npc_id:
|
|
self._json_response({"error": "npc_id required"}, 400)
|
|
return
|
|
result = combat_manager.start_fight(user_id, username, room, npc_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
npc = result.get("npc", {})
|
|
notification_manager.broadcast_room(
|
|
room, "combat_start",
|
|
f"{username} has engaged {npc.get('name', 'an enemy')} in combat!",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/combat/attack':
|
|
# POST /bridge/combat/attack — attack NPC in active fight
|
|
# Body: { user_id, username, room }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
result = combat_manager.attack(user_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
enc = result.get("encounter", {})
|
|
npc = result.get("npc", {})
|
|
loot = result.get("loot", [])
|
|
# Broadcast combat action to room
|
|
combat_msg = f"{username} strikes {npc.get('name', 'the enemy')} for {result.get('player_dmg', 0)} damage!"
|
|
notification_manager.broadcast_room(
|
|
room, "combat_action", combat_msg,
|
|
exclude_user=user_id, data=result)
|
|
# If NPC died, broadcast death
|
|
if not npc.get("alive", True):
|
|
death_msg = f"{npc.get('name', 'The enemy')} has been slain by {username}!"
|
|
if loot:
|
|
death_msg += f" Loot: {', '.join(i['name'] for i in loot)}"
|
|
notification_manager.broadcast_room(
|
|
room, "combat_death", death_msg, data=result)
|
|
# If player died
|
|
if enc.get("player_hp", 1) <= 0:
|
|
notification_manager.broadcast_room(
|
|
room, "combat_defeat",
|
|
f"{username} has been defeated in combat!", data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/combat/defend':
|
|
# POST /bridge/combat/defend — defend (half damage next hit)
|
|
# Body: { user_id, username, room }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
result = combat_manager.defend(user_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
notification_manager.broadcast_room(
|
|
room, "combat_action",
|
|
f"{username} braces defensively.",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/combat/flee':
|
|
# POST /bridge/combat/flee — flee from combat
|
|
# Body: { user_id, username, room }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
result = combat_manager.flee(user_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
notification_manager.broadcast_room(
|
|
room, "combat_flee",
|
|
f"{username} flees from combat!",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/spell/cast':
|
|
# POST /bridge/spell/cast — cast a spell
|
|
# Body: { user_id, username, room, spell_id, target? }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
spell_id = body.get('spell_id', '')
|
|
target = body.get('target')
|
|
if not spell_id:
|
|
self._json_response({"error": "spell_id required"}, 400)
|
|
return
|
|
result = magic_manager.cast_spell(user_id, username, room, spell_id, target)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
# Broadcast spell casting to room
|
|
notification_manager.broadcast_room(
|
|
room, "spell_cast",
|
|
result.get("message", f"{username} casts a spell!"),
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/spell/learn':
|
|
# POST /bridge/spell/learn — learn a new spell
|
|
# Body: { user_id, username, spell_id }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
spell_id = body.get('spell_id', '')
|
|
if not spell_id:
|
|
self._json_response({"error": "spell_id required"}, 400)
|
|
return
|
|
result = magic_manager.learn_spell(user_id, username, spell_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
self._json_response(result)
|
|
|
|
else:
|
|
self._json_response({"error": "not found"}, 404)
|
|
|
|
def _parse_mud_command(self, user_id: str, username: str, room: str, command: str) -> dict:
|
|
"""Parse and execute a MUD-style command."""
|
|
parts = command.split(None, 1)
|
|
verb = parts[0].lower() if parts else ''
|
|
arg = parts[1].strip() if len(parts) > 1 else ''
|
|
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
|
|
rooms = world_state.get('rooms', {})
|
|
room_data = rooms.get(room, {})
|
|
|
|
if verb == 'look':
|
|
# Return room description and scene
|
|
desc = room_data.get('description_base', 'An empty room.')
|
|
desc_dynamic = room_data.get('description_dynamic', '')
|
|
objects = room_data.get('objects', [])
|
|
players = presence_manager.get_players_in_room(room)
|
|
|
|
scene = [desc]
|
|
if desc_dynamic:
|
|
scene.append(desc_dynamic)
|
|
if objects:
|
|
scene.append(f"You see: {', '.join(objects)}.")
|
|
# Show items on the ground
|
|
ground_items = inventory_manager.get_room_items(room)
|
|
if ground_items:
|
|
item_names = [it["name"] for it in ground_items]
|
|
scene.append(f"On the ground: {', '.join(item_names)}.")
|
|
if players:
|
|
names = [p['username'] for p in players if p['user_id'] != user_id]
|
|
if names:
|
|
scene.append(f"Also here: {', '.join(names)}.")
|
|
# Build exits from description_base
|
|
# Try world_state exits first, fall back to description parsing
|
|
exits = room_data.get('exits', {})
|
|
if not exits:
|
|
exits = self._extract_exits(room_data.get('description_base', ''))
|
|
if exits:
|
|
scene.append(f"Exits: {', '.join(exits.keys())}.")
|
|
|
|
return {
|
|
"command": "look",
|
|
"room": room,
|
|
"description": "\n".join(scene),
|
|
"objects": objects,
|
|
"exits": exits,
|
|
"players": players,
|
|
}
|
|
|
|
elif verb == 'go':
|
|
if not arg:
|
|
return {"command": "go", "error": "Go where? Specify a direction."}
|
|
direction = arg.lower()
|
|
# Try world_state exits first, fall back to description parsing
|
|
exits = room_data.get('exits', {})
|
|
if not exits:
|
|
exits = self._extract_exits(room_data.get('description_base', ''))
|
|
# Match direction
|
|
dest = None
|
|
for dir_name, dest_room in exits.items():
|
|
if dir_name.lower().startswith(direction) or direction in dir_name.lower():
|
|
dest = dest_room
|
|
break
|
|
if not dest:
|
|
return {"command": "go", "error": f"You can't go '{arg}' from here. Exits: {', '.join(exits.keys()) if exits else 'none'}."}
|
|
if dest not in rooms:
|
|
return {"command": "go", "error": f"Unknown destination: {dest}."}
|
|
# Move user — auto-notify
|
|
leave_event = presence_manager.leave_room(user_id, room)
|
|
if leave_event:
|
|
notification_manager.broadcast_room(
|
|
room, "player_leave",
|
|
f"{username} has left {room}.",
|
|
exclude_user=user_id, data=leave_event)
|
|
enter_event = presence_manager.enter_room(user_id, username, dest)
|
|
if enter_event:
|
|
notification_manager.broadcast_room(
|
|
dest, "player_join",
|
|
f"{username} has entered {dest}.",
|
|
exclude_user=user_id, data=enter_event)
|
|
# Update session room
|
|
with session_manager._lock:
|
|
if user_id in session_manager.sessions:
|
|
session_manager.sessions[user_id].room = dest
|
|
# Return new room look
|
|
new_room_data = rooms[dest]
|
|
desc = new_room_data.get('description_base', 'An empty room.')
|
|
desc_dynamic = new_room_data.get('description_dynamic', '')
|
|
objects = new_room_data.get('objects', [])
|
|
players = presence_manager.get_players_in_room(dest)
|
|
scene = [f"You go {arg}.\n"]
|
|
scene.append(desc)
|
|
if desc_dynamic:
|
|
scene.append(desc_dynamic)
|
|
if objects:
|
|
scene.append(f"You see: {', '.join(objects)}.")
|
|
new_exits = self._extract_exits(desc)
|
|
if new_exits:
|
|
scene.append(f"Exits: {', '.join(new_exits.keys())}.")
|
|
return {
|
|
"command": "go",
|
|
"direction": arg,
|
|
"room": dest,
|
|
"description": "\n".join(scene),
|
|
"exits": new_exits,
|
|
}
|
|
|
|
elif verb == 'examine':
|
|
if not arg:
|
|
return {"command": "examine", "error": "Examine what?"}
|
|
objects = room_data.get('objects', [])
|
|
# Fuzzy match object
|
|
target = arg.lower()
|
|
matched = None
|
|
for obj in objects:
|
|
if target in obj.lower() or obj.lower() in target:
|
|
matched = obj
|
|
break
|
|
if not matched:
|
|
return {"command": "examine", "error": f"You don't see '{arg}' here. Objects: {', '.join(objects) if objects else 'none'}."}
|
|
# Check whiteboard
|
|
if matched == 'whiteboard':
|
|
wb = room_data.get('whiteboard', [])
|
|
wb_text = "\n".join(f" - {w}" for w in wb) if wb else "(empty)"
|
|
return {"command": "examine", "object": matched, "description": f"The whiteboard reads:\n{wb_text}"}
|
|
# Default descriptions for known objects
|
|
descriptions = {
|
|
"stone floor": "Smooth stone, worn by countless footsteps.",
|
|
"doorframe": "An archway of fitted stone, humming faintly with energy.",
|
|
"server racks": "Wrought-iron racks filled with humming servers. Green LEDs blink in the dim light.",
|
|
"green LED": "A single green LED, pulsing steadily. Heartbeat.",
|
|
"cot": "A simple cot in the corner. Someone sleeps here sometimes.",
|
|
"anvil": "A heavy iron anvil, scarred and dented from years of use.",
|
|
"hammer": "A well-worn hammer, its handle smooth from use.",
|
|
"tongs": "Iron tongs, blackened by the forge.",
|
|
"hearth": "The hearth glows with banked coals. Warmth radiates outward.",
|
|
"tools": "Hammers, tongs, chisels — all well-used.",
|
|
"stone bench": "A cool stone bench under the oak tree. A good place to sit.",
|
|
"oak tree": "An old oak tree, its branches spreading wide over the garden.",
|
|
"herbs": "Lavender, rosemary, thyme — fragrant and green.",
|
|
"wildflowers": "Small flowers in purple, yellow, and white.",
|
|
"railing": "The bridge railing is worn smooth. Carvings mark the stone.",
|
|
"dark water": "Dark water flows beneath the bridge. You cannot see the bottom.",
|
|
}
|
|
desc = descriptions.get(matched, f"You look at the {matched}. It seems ordinary enough.")
|
|
return {"command": "examine", "object": matched, "description": desc}
|
|
|
|
elif verb == 'say':
|
|
if not arg:
|
|
return {"command": "say", "error": "Say what?"}
|
|
event = presence_manager.say(user_id, username, room, arg)
|
|
players = presence_manager.get_players_in_room(room)
|
|
return {
|
|
"command": "say",
|
|
"message": arg,
|
|
"room": room,
|
|
"event": event,
|
|
"recipients": players,
|
|
}
|
|
|
|
elif verb == 'ask':
|
|
if not arg:
|
|
return {"command": "ask", "error": "Ask what?"}
|
|
# Forward to chat endpoint logic
|
|
session = session_manager.get_or_create(user_id, username, room)
|
|
if not presence_manager.get_players_in_room(room) or \
|
|
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
|
|
presence_manager.enter_room(user_id, username, room)
|
|
response = session.chat(arg)
|
|
return {
|
|
"command": "ask",
|
|
"message": arg,
|
|
"room": room,
|
|
"response": response,
|
|
"session_messages": len(session.messages),
|
|
}
|
|
|
|
else:
|
|
return {
|
|
"command": verb,
|
|
"error": f"Unknown command: '{verb}'. Try: look, go <dir>, examine <obj>, say <msg>, ask <msg>",
|
|
}
|
|
|
|
def _extract_exits(self, description: str) -> dict:
|
|
"""Extract exits from room description like 'North to the Tower. East to the Garden.'"""
|
|
import re
|
|
exits = {}
|
|
# Match patterns like "North to the Tower", "East to the Garden"
|
|
pattern = r'(North|South|East|West|Up|Down|Northeast|Northwest|Southeast|Southwest)\s+to\s+(?:the\s+)?([A-Z][a-zA-Z\s]+?)(?:\.|$)'
|
|
for match in re.finditer(pattern, description):
|
|
direction = match.group(1)
|
|
destination = match.group(2).strip()
|
|
# Reconstruct full room name
|
|
room_name = destination if destination.startswith('The ') else f"The {destination}"
|
|
exits[direction] = room_name
|
|
return exits
|
|
|
|
def _json_response(self, data: dict, code: int = 200):
|
|
self.send_response(code)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(data).encode())
|
|
|
|
def log_message(self, format, *args):
|
|
pass # Suppress HTTP logs
|
|
|
|
|
|
# ── World Tick System ──────────────────────────────────────────────────
|
|
|
|
import random
|
|
|
|
class WorldTickSystem:
|
|
"""Advances the world state every TICK_INTERVAL seconds.
|
|
|
|
On each tick:
|
|
- Increment tick counter
|
|
- Cycle time_of_day (dawn -> morning -> midday -> afternoon -> dusk -> night)
|
|
- Randomly change weather (clear, cloudy, foggy, light rain, heavy rain, storm)
|
|
- Evolve room states:
|
|
* The Forge: fire decays (blazing -> burning -> glowing -> embers -> cold)
|
|
* The Garden: growth advances (seeds -> sprouts -> growing -> blooming -> harvest)
|
|
* The Bridge: rain toggles
|
|
- Broadcast world events to connected players
|
|
- Save world state to disk
|
|
"""
|
|
|
|
TICK_INTERVAL = 60 # seconds
|
|
|
|
TIME_CYCLE = ["dawn", "morning", "midday", "afternoon", "dusk", "night"]
|
|
|
|
WEATHER_OPTIONS = [
|
|
"clear", "clear", "clear", # weighted toward clear
|
|
"cloudy", "cloudy",
|
|
"foggy",
|
|
"light rain",
|
|
"heavy rain",
|
|
"storm",
|
|
]
|
|
|
|
FIRE_STAGES = ["cold", "embers", "glowing", "burning", "blazing"]
|
|
GROWTH_STAGES = ["seeds", "sprouts", "growing", "blooming", "harvest"]
|
|
|
|
def __init__(self):
|
|
self._thread: threading.Thread | None = None
|
|
self._running = False
|
|
self._state_lock = threading.Lock()
|
|
|
|
# ── lifecycle ───────────────────────────────────────────────────────
|
|
|
|
def start(self):
|
|
"""Start the tick loop in a daemon thread."""
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._loop, daemon=True, name="world-tick")
|
|
self._thread.start()
|
|
print("[WorldTick] Started — ticking every {}s".format(self.TICK_INTERVAL))
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
|
|
# ── internal loop ───────────────────────────────────────────────────
|
|
|
|
def _loop(self):
|
|
while self._running:
|
|
time.sleep(self.TICK_INTERVAL)
|
|
try:
|
|
self._tick()
|
|
except Exception as e:
|
|
print(f"[WorldTick] Error during tick: {e}")
|
|
|
|
def _tick(self):
|
|
"""Execute one world tick."""
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
if state_file.exists():
|
|
state = json.loads(state_file.read_text())
|
|
else:
|
|
state = {"tick": 0, "time_of_day": "morning", "weather": "clear", "rooms": {}}
|
|
|
|
events: list[str] = []
|
|
|
|
with self._state_lock:
|
|
# 1. Advance tick counter
|
|
state["tick"] = state.get("tick", 0) + 1
|
|
tick_num = state["tick"]
|
|
|
|
# 2. Time of day — advance every tick
|
|
current_time = state.get("time_of_day", "morning")
|
|
try:
|
|
idx = self.TIME_CYCLE.index(current_time)
|
|
except ValueError:
|
|
idx = 0
|
|
next_idx = (idx + 1) % len(self.TIME_CYCLE)
|
|
new_time = self.TIME_CYCLE[next_idx]
|
|
state["time_of_day"] = new_time
|
|
if new_time == "dawn":
|
|
events.append("The sun rises over The Tower. A new day begins.")
|
|
elif new_time == "dusk":
|
|
events.append("The sky darkens. Dusk settles over the world.")
|
|
elif new_time == "night":
|
|
events.append("Night falls. The green LED in The Tower pulses in the dark.")
|
|
|
|
# 3. Weather — change every 3 ticks
|
|
if tick_num % 3 == 0:
|
|
old_weather = state.get("weather", "clear")
|
|
new_weather = random.choice(self.WEATHER_OPTIONS)
|
|
state["weather"] = new_weather
|
|
if new_weather != old_weather:
|
|
events.append(f"The weather shifts to {new_weather}.")
|
|
|
|
# 4. Room states
|
|
rooms = state.get("rooms", {})
|
|
|
|
# The Forge — fire decays one stage every 2 ticks unless rekindled
|
|
forge = rooms.get("The Forge", {})
|
|
if forge:
|
|
fire = forge.get("fire_state", "cold")
|
|
untouched = forge.get("fire_untouched_ticks", 0) + 1
|
|
forge["fire_untouched_ticks"] = untouched
|
|
if untouched >= 2 and fire != "cold":
|
|
try:
|
|
fi = self.FIRE_STAGES.index(fire)
|
|
except ValueError:
|
|
fi = 0
|
|
if fi > 0:
|
|
forge["fire_state"] = self.FIRE_STAGES[fi - 1]
|
|
forge["fire_untouched_ticks"] = 0
|
|
if forge["fire_state"] == "embers":
|
|
events.append("The forge fire has died down to embers.")
|
|
elif forge["fire_state"] == "cold":
|
|
events.append("The forge fire has gone cold.")
|
|
rooms["The Forge"] = forge
|
|
|
|
# The Garden — growth advances one stage every 4 ticks
|
|
garden = rooms.get("The Garden", {})
|
|
if garden:
|
|
if tick_num % 4 == 0:
|
|
growth = garden.get("growth_stage", "seeds")
|
|
try:
|
|
gi = self.GROWTH_STAGES.index(growth)
|
|
except ValueError:
|
|
gi = 0
|
|
if gi < len(self.GROWTH_STAGES) - 1:
|
|
garden["growth_stage"] = self.GROWTH_STAGES[gi + 1]
|
|
events.append(f"The garden has advanced to {garden['growth_stage']}.")
|
|
else:
|
|
# Reset cycle
|
|
garden["growth_stage"] = "seeds"
|
|
events.append("The garden has been harvested. New seeds are planted.")
|
|
rooms["The Garden"] = garden
|
|
|
|
# The Bridge — rain state follows weather
|
|
bridge = rooms.get("The Bridge", {})
|
|
if bridge:
|
|
weather = state.get("weather", "clear")
|
|
is_raining = weather in ("light rain", "heavy rain", "storm")
|
|
was_raining = bridge.get("rain_active", False)
|
|
bridge["rain_active"] = is_raining
|
|
if is_raining and not was_raining:
|
|
events.append("Rain begins to fall on The Bridge.")
|
|
elif not is_raining and was_raining:
|
|
events.append("The rain on The Bridge lets up.")
|
|
rooms["The Bridge"] = bridge
|
|
|
|
state["rooms"] = rooms
|
|
state["last_updated"] = datetime.now().isoformat()
|
|
|
|
# 5. Mana regeneration
|
|
magic_manager.regenerate_mana()
|
|
|
|
# 6. Save world state
|
|
state_file.write_text(json.dumps(state, indent=2))
|
|
|
|
# 6. Broadcast events to all occupied rooms (outside lock)
|
|
if events:
|
|
event_text = " ".join(events)
|
|
for room_name in list(presence_manager._rooms.keys()):
|
|
players = presence_manager.get_players_in_room(room_name)
|
|
if players:
|
|
notification_manager.broadcast_room(
|
|
room_name, "world_tick", event_text,
|
|
data={"tick": state["tick"], "time_of_day": state["time_of_day"],
|
|
"weather": state.get("weather"), "events": events},
|
|
)
|
|
print(f"[WorldTick] tick={state['tick']} time={state['time_of_day']} "
|
|
f"weather={state.get('weather')} events={len(events)}")
|
|
|
|
|
|
world_tick_system = WorldTickSystem()
|
|
|
|
|
|
# ── Main ───────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
print(f"Multi-User AI Bridge starting on {BRIDGE_HOST}:{BRIDGE_PORT}")
|
|
print(f"World dir: {WORLD_DIR}")
|
|
print(f"Max sessions: {session_manager.max_sessions}")
|
|
print()
|
|
print("Endpoints:")
|
|
print(f" GET /bridge/health — Health check")
|
|
print(f" GET /bridge/sessions — List active sessions")
|
|
print(f" GET /bridge/world-state — Full world state (rooms, players, convos)")
|
|
print(f" GET /bridge/stats — Aggregate stats (sessions, messages, uptime)")
|
|
print(f" GET /bridge/room/<room>/players — List players in a room")
|
|
print(f" GET /bridge/room/<room>/events — Room events (presence + chat)")
|
|
print(f" GET /bridge/session/<id>/summary — Get conversation summary")
|
|
print(f" POST /bridge/chat — Send message to Timmy")
|
|
print(f" POST /bridge/say — Say something to room (visible to all)")
|
|
print(f" POST /bridge/move — Move user to room (triggers presence)")
|
|
print(f" POST /bridge/command — MUD command parser (look, go, examine, say, ask)")
|
|
print()
|
|
|
|
# Start world tick system
|
|
world_tick_system.start()
|
|
|
|
server = HTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|