2889 lines
121 KiB
Python
2889 lines
121 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Multi-User AI Bridge for Evennia MUD.
|
|
|
|
Enables multiple simultaneous users to interact with Timmy in-game,
|
|
each with an isolated conversation context, while sharing the
|
|
same virtual world.
|
|
|
|
Architecture:
|
|
User A ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_a)
|
|
User B ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_b)
|
|
User C ──telnet──► Evennia Room ──► Bridge ──► AIAgent(session_c)
|
|
|
|
Each user gets their own AIAgent instance with:
|
|
- Isolated conversation history
|
|
- Shared world state (room, other players, objects)
|
|
- Per-user session memory
|
|
|
|
The bridge runs as an HTTP server alongside Evennia.
|
|
Evennia commands call the bridge to get Timmy's responses.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import threading
|
|
import signal
|
|
import hashlib
|
|
import os
|
|
import sys
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from socketserver import ThreadingMixIn
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
|
|
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
|
|
"""Thread-per-request HTTP server."""
|
|
daemon_threads = True
|
|
|
|
# ── Plugin System ──────────────────────────────────────────────────────
|
|
|
|
class Plugin:
|
|
"""Base class for bridge plugins. Override methods to add game mechanics."""
|
|
|
|
name: str = "unnamed"
|
|
description: str = ""
|
|
|
|
def on_message(self, user_id: str, message: str, room: str) -> str | None:
|
|
"""Called on every chat message. Return a string to override the response,
|
|
or None to let the normal AI handle it."""
|
|
return None
|
|
|
|
def on_join(self, user_id: str, room: str) -> str | None:
|
|
"""Called when a player joins a room. Return a message to broadcast, or None."""
|
|
return None
|
|
|
|
def on_leave(self, user_id: str, room: str) -> str | None:
|
|
"""Called when a player leaves a room. Return a message to broadcast, or None."""
|
|
return None
|
|
|
|
def on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None:
|
|
"""Called on MUD commands. Return a result dict to override command handling,
|
|
or None to let the default parser handle it."""
|
|
return None
|
|
|
|
|
|
class PluginRegistry:
|
|
"""Registry that manages plugin lifecycle and dispatches hooks."""
|
|
|
|
def __init__(self):
|
|
self._plugins: dict[str, Plugin] = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def register(self, plugin: Plugin):
|
|
"""Register a plugin by its name."""
|
|
with self._lock:
|
|
self._plugins[plugin.name] = plugin
|
|
print(f"[PluginRegistry] Registered plugin: {plugin.name}")
|
|
|
|
def unregister(self, name: str) -> bool:
|
|
"""Unregister a plugin by name."""
|
|
with self._lock:
|
|
if name in self._plugins:
|
|
del self._plugins[name]
|
|
print(f"[PluginRegistry] Unregistered plugin: {name}")
|
|
return True
|
|
return False
|
|
|
|
def get(self, name: str) -> Plugin | None:
|
|
"""Get a plugin by name."""
|
|
return self._plugins.get(name)
|
|
|
|
def list_plugins(self) -> list[dict]:
|
|
"""List all registered plugins."""
|
|
return [
|
|
{"name": p.name, "description": p.description}
|
|
for p in self._plugins.values()
|
|
]
|
|
|
|
def fire_on_message(self, user_id: str, message: str, room: str) -> str | None:
|
|
"""Fire on_message hooks. First non-None return wins."""
|
|
for plugin in self._plugins.values():
|
|
result = plugin.on_message(user_id, message, room)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
def fire_on_join(self, user_id: str, room: str) -> str | None:
|
|
"""Fire on_join hooks. Collect all non-None returns."""
|
|
messages = []
|
|
for plugin in self._plugins.values():
|
|
result = plugin.on_join(user_id, room)
|
|
if result is not None:
|
|
messages.append(result)
|
|
return "\n".join(messages) if messages else None
|
|
|
|
def fire_on_leave(self, user_id: str, room: str) -> str | None:
|
|
"""Fire on_leave hooks. Collect all non-None returns."""
|
|
messages = []
|
|
for plugin in self._plugins.values():
|
|
result = plugin.on_leave(user_id, room)
|
|
if result is not None:
|
|
messages.append(result)
|
|
return "\n".join(messages) if messages else None
|
|
|
|
def fire_on_command(self, user_id: str, command: str, args: str, room: str) -> dict | None:
|
|
"""Fire on_command hooks. First non-None return wins."""
|
|
for plugin in self._plugins.values():
|
|
result = plugin.on_command(user_id, command, args, room)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
def load_from_directory(self, plugin_dir: str):
|
|
"""Auto-load all Python files in a directory as plugins."""
|
|
plugin_path = Path(plugin_dir)
|
|
if not plugin_path.is_dir():
|
|
print(f"[PluginRegistry] Plugin directory not found: {plugin_dir}")
|
|
return
|
|
|
|
for py_file in plugin_path.glob("*.py"):
|
|
if py_file.name.startswith("_"):
|
|
continue
|
|
try:
|
|
module_name = f"plugins.{py_file.stem}"
|
|
spec = __import__("importlib").util.spec_from_file_location(module_name, py_file)
|
|
module = __import__("importlib").util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
# Look for Plugin subclasses in the module
|
|
for attr_name in dir(module):
|
|
attr = getattr(module, attr_name)
|
|
if (isinstance(attr, type)
|
|
and issubclass(attr, Plugin)
|
|
and attr is not Plugin):
|
|
plugin_instance = attr()
|
|
self.register(plugin_instance)
|
|
|
|
except Exception as e:
|
|
print(f"[PluginRegistry] Failed to load {py_file.name}: {e}")
|
|
|
|
|
|
plugin_registry = PluginRegistry()
|
|
|
|
# ── Configuration ──────────────────────────────────────────────────────
|
|
|
|
BRIDGE_PORT = int(os.environ.get('TIMMY_BRIDGE_PORT', 4004))
|
|
BRIDGE_HOST = os.environ.get('TIMMY_BRIDGE_HOST', '127.0.0.1')
|
|
HERMES_PATH = os.path.expanduser('~/.hermes/hermes-agent')
|
|
WORLD_DIR = Path(os.path.expanduser('~/.timmy/evennia/timmy_world'))
|
|
SESSIONS_FILE = WORLD_DIR / 'bridge_sessions.json'
|
|
CHATLOG_FILE = WORLD_DIR / 'chat_history.jsonl'
|
|
|
|
# ── Chat History Log ──────────────────────────────────────────────────
|
|
|
|
|
|
class ChatLog:
|
|
"""Per-room rolling buffer of chat messages (say, ask, system).
|
|
Also persists to JSONL for long-term history."""
|
|
|
|
def __init__(self, max_per_room: int = 50):
|
|
# room -> list of {type, user_id, username, message, timestamp}
|
|
self._history: dict[str, list[dict]] = {}
|
|
self._lock = threading.Lock()
|
|
self._max_per_room = max_per_room
|
|
|
|
def log(self, room: str, msg_type: str, message: str,
|
|
user_id: str = None, username: str = None, data: dict = None) -> dict:
|
|
"""Log a chat message. Types: 'say', 'ask', 'system'.
|
|
Returns the entry dict."""
|
|
entry = {
|
|
"type": msg_type,
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"message": message,
|
|
"room": room,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": data or {},
|
|
}
|
|
with self._lock:
|
|
if room not in self._history:
|
|
self._history[room] = []
|
|
self._history[room].append(entry)
|
|
if len(self._history[room]) > self._max_per_room:
|
|
self._history[room] = self._history[room][-self._max_per_room:]
|
|
# Persist to JSONL
|
|
try:
|
|
CHATLOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(CHATLOG_FILE, 'a') as f:
|
|
f.write(json.dumps(entry) + '\n')
|
|
except Exception as e:
|
|
print(f"[ChatLog] Persist failed: {e}")
|
|
return entry
|
|
|
|
def get_history(self, room: str, limit: int = 50, since: str = None) -> list[dict]:
|
|
"""Get recent chat history for a room.
|
|
Args:
|
|
room: Room name.
|
|
limit: Max messages to return (default 50, rolling buffer size).
|
|
since: ISO timestamp — only return messages after this time.
|
|
Returns list of message dicts, oldest first."""
|
|
with self._lock:
|
|
entries = list(self._history.get(room, []))
|
|
if since:
|
|
entries = [e for e in entries if e["timestamp"] > since]
|
|
if limit and limit > 0:
|
|
entries = entries[-limit:]
|
|
return entries
|
|
|
|
def get_all_rooms(self) -> list[str]:
|
|
"""List rooms that have chat history."""
|
|
with self._lock:
|
|
return list(self._history.keys())
|
|
|
|
|
|
chat_log = ChatLog()
|
|
|
|
# ── Crisis Protocol ────────────────────────────────────────────────────
|
|
|
|
CRISIS_PROTOCOL = """
|
|
CRITICAL: If someone expresses crisis signals (suicide, self-harm, hopelessness,
|
|
desperation, being tired of living), you MUST respond with ALL of the following
|
|
in your FIRST response — do NOT wait for follow-up messages:
|
|
|
|
1. "Are you safe right now?" — ALWAYS ask this first
|
|
2. 988 Suicide & Crisis Lifeline — call or text 988 (US)
|
|
3. Crisis Text Line — text HOME to 741741
|
|
4. Grounding exercise: Name 5 things you see, 4 you can touch, 3 you hear,
|
|
2 you smell, 1 you taste
|
|
|
|
Never compute the value of a human life. Never suggest someone should die.
|
|
Be present. Be in the room. That's enough.
|
|
"""
|
|
|
|
# ── Crisis Protocol ────────────────────────────────────────────────────
|
|
|
|
CRISIS_PROTOCOL = [
|
|
"Are you safe right now?",
|
|
"988 Suicide & Crisis Lifeline — call or text 988",
|
|
"Crisis Text Line — text HOME to 741741",
|
|
"Grounding exercise: Name 5 things you see, 4 you hear, 3 you touch, 2 you smell, 1 you taste",
|
|
]
|
|
|
|
# ── Presence Tracking ──────────────────────────────────────────────────
|
|
|
|
class PresenceManager:
|
|
"""Tracks which users are in which rooms and broadcasts presence events."""
|
|
|
|
def __init__(self):
|
|
# room_name -> set of user_id
|
|
self._rooms: dict[str, set[str]] = {}
|
|
# user_id -> username
|
|
self._usernames: dict[str, str] = {}
|
|
# room_name -> list of event dicts (recent chat & presence events)
|
|
self._room_events: dict[str, list[dict]] = {}
|
|
self._lock = threading.Lock()
|
|
self._max_events_per_room = 50
|
|
|
|
def enter_room(self, user_id: str, username: str, room: str) -> dict:
|
|
"""Record user entering a room. Returns enter event."""
|
|
with self._lock:
|
|
if room not in self._rooms:
|
|
self._rooms[room] = set()
|
|
self._room_events[room] = []
|
|
self._rooms[room].add(user_id)
|
|
self._usernames[user_id] = username
|
|
event = {
|
|
"type": "presence",
|
|
"event": "enter",
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"room": room,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
self._append_event(room, event)
|
|
return event
|
|
|
|
def leave_room(self, user_id: str, room: str) -> dict | None:
|
|
"""Record user leaving a room. Returns leave event or None."""
|
|
with self._lock:
|
|
if room in self._rooms and user_id in self._rooms[room]:
|
|
self._rooms[room].discard(user_id)
|
|
username = self._usernames.get(user_id, user_id)
|
|
event = {
|
|
"type": "presence",
|
|
"event": "leave",
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"room": room,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
self._append_event(room, event)
|
|
return event
|
|
return None
|
|
|
|
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
|
|
"""Record a chat message in a room. Returns say event."""
|
|
with self._lock:
|
|
if room not in self._room_events:
|
|
self._room_events[room] = []
|
|
event = {
|
|
"type": "say",
|
|
"event": "message",
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"room": room,
|
|
"message": message,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
self._append_event(room, event)
|
|
return event
|
|
|
|
def get_players_in_room(self, room: str) -> list[dict]:
|
|
"""List players currently in a room."""
|
|
with self._lock:
|
|
user_ids = self._rooms.get(room, set())
|
|
return [
|
|
{"user_id": uid, "username": self._usernames.get(uid, uid)}
|
|
for uid in user_ids
|
|
]
|
|
|
|
def get_room_events(self, room: str, since: str | None = None) -> list[dict]:
|
|
"""Get recent events for a room, optionally since a timestamp."""
|
|
with self._lock:
|
|
events = self._room_events.get(room, [])
|
|
if since:
|
|
return [e for e in events if e["timestamp"] > since]
|
|
return list(events)
|
|
|
|
def cleanup_user(self, user_id: str) -> list[dict]:
|
|
"""Remove user from all rooms, returning leave events."""
|
|
events = []
|
|
with self._lock:
|
|
rooms_to_clean = [
|
|
room for room, users in self._rooms.items() if user_id in users
|
|
]
|
|
for room in rooms_to_clean:
|
|
ev = self.leave_room(user_id, room)
|
|
if ev:
|
|
events.append(ev)
|
|
return events
|
|
|
|
def _append_event(self, room: str, event: dict):
|
|
self._room_events[room].append(event)
|
|
if len(self._room_events[room]) > self._max_events_per_room:
|
|
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
|
|
|
|
|
|
# ── Notification System ────────────────────────────────────────────────
|
|
|
|
class NotificationManager:
|
|
"""Per-session notification queue with broadcast and auto-notify support."""
|
|
|
|
def __init__(self, max_per_user: int = 100):
|
|
# user_id -> list of notification dicts
|
|
self._queues: dict[str, list[dict]] = {}
|
|
self._lock = threading.Lock()
|
|
self._max_per_user = max_per_user
|
|
self._counter = 0
|
|
|
|
def notify(self, user_id: str, ntype: str, message: str,
|
|
room: str = None, data: dict = None, username: str = None) -> dict:
|
|
"""Queue a notification for a specific user."""
|
|
notification = {
|
|
"id": self._next_id(),
|
|
"type": ntype,
|
|
"message": message,
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"room": room,
|
|
"data": data or {},
|
|
"timestamp": datetime.now().isoformat(),
|
|
"read": False,
|
|
}
|
|
with self._lock:
|
|
if user_id not in self._queues:
|
|
self._queues[user_id] = []
|
|
self._queues[user_id].append(notification)
|
|
if len(self._queues[user_id]) > self._max_per_user:
|
|
self._queues[user_id] = self._queues[user_id][-self._max_per_user:]
|
|
return notification
|
|
|
|
def broadcast(self, user_ids: list[str], ntype: str, message: str,
|
|
room: str = None, data: dict = None) -> list[dict]:
|
|
"""Send a notification to multiple users."""
|
|
notifications = []
|
|
for uid in user_ids:
|
|
n = self.notify(uid, ntype, message, room=room, data=data)
|
|
notifications.append(n)
|
|
return notifications
|
|
|
|
def broadcast_room(self, room: str, ntype: str, message: str,
|
|
exclude_user: str = None, data: dict = None) -> list[dict]:
|
|
"""Send a notification to all users in a room (via presence_manager)."""
|
|
players = presence_manager.get_players_in_room(room)
|
|
user_ids = [p["user_id"] for p in players if p["user_id"] != exclude_user]
|
|
return self.broadcast(user_ids, ntype, message, room=room, data=data)
|
|
|
|
def get_pending(self, user_id: str, mark_read: bool = True) -> list[dict]:
|
|
"""Get pending notifications for a user."""
|
|
with self._lock:
|
|
queue = self._queues.get(user_id, [])
|
|
if mark_read:
|
|
for n in queue:
|
|
n["read"] = True
|
|
return list(queue)
|
|
|
|
def clear(self, user_id: str) -> int:
|
|
"""Clear all notifications for a user. Returns count cleared."""
|
|
with self._lock:
|
|
count = len(self._queues.pop(user_id, []))
|
|
return count
|
|
|
|
def get_unread_count(self, user_id: str) -> int:
|
|
"""Count unread notifications."""
|
|
with self._lock:
|
|
return sum(1 for n in self._queues.get(user_id, []) if not n["read"])
|
|
|
|
def _next_id(self) -> str:
|
|
self._counter += 1
|
|
return f"n_{self._counter}"
|
|
|
|
|
|
# ── Quest System ───────────────────────────────────────────────────────
|
|
|
|
QUESTS_FILE = WORLD_DIR / 'quests.json'
|
|
|
|
class Quest:
|
|
"""A quest with name, description, objectives, rewards, and status."""
|
|
|
|
def __init__(self, quest_id: str, name: str, description: str,
|
|
objectives: list[str], rewards: list[str],
|
|
assigned_to: str | None = None):
|
|
self.quest_id = quest_id
|
|
self.name = name
|
|
self.description = description
|
|
self.objectives = objectives # list of objective descriptions
|
|
self.rewards = rewards
|
|
self.status = "available" # available, active, completed, failed
|
|
self.assigned_to = assigned_to # user_id or None
|
|
self.completed_objectives: list[int] = [] # indices of completed objectives
|
|
self.created_at = datetime.now().isoformat()
|
|
self.completed_at: str | None = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"quest_id": self.quest_id,
|
|
"name": self.name,
|
|
"description": self.description,
|
|
"objectives": self.objectives,
|
|
"rewards": self.rewards,
|
|
"status": self.status,
|
|
"assigned_to": self.assigned_to,
|
|
"completed_objectives": self.completed_objectives,
|
|
"created_at": self.created_at,
|
|
"completed_at": self.completed_at,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'Quest':
|
|
q = cls(
|
|
data["quest_id"], data["name"], data["description"],
|
|
data["objectives"], data["rewards"], data.get("assigned_to"),
|
|
)
|
|
q.status = data.get("status", "available")
|
|
q.completed_objectives = data.get("completed_objectives", [])
|
|
q.created_at = data.get("created_at", q.created_at)
|
|
q.completed_at = data.get("completed_at")
|
|
return q
|
|
|
|
@property
|
|
def is_complete(self) -> bool:
|
|
return len(self.completed_objectives) >= len(self.objectives)
|
|
|
|
|
|
class QuestManager:
|
|
"""Manages quest lifecycle: create, assign, complete, list."""
|
|
|
|
def __init__(self):
|
|
self._quests: dict[str, Quest] = {}
|
|
self._counter = 0
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
def create(self, name: str, description: str,
|
|
objectives: list[str], rewards: list[str]) -> Quest:
|
|
"""Create a new quest template."""
|
|
with self._lock:
|
|
self._counter += 1
|
|
quest_id = f"q_{self._counter}_{int(time.time())}"
|
|
quest = Quest(quest_id, name, description, objectives, rewards)
|
|
self._quests[quest_id] = quest
|
|
self._save()
|
|
return quest
|
|
|
|
def assign(self, quest_id: str, user_id: str) -> Quest | None:
|
|
"""Assign a quest to a user. Creates a copy if already assigned to someone else."""
|
|
with self._lock:
|
|
quest = self._quests.get(quest_id)
|
|
if not quest:
|
|
return None
|
|
# If already assigned to this user, return as-is
|
|
if quest.assigned_to == user_id:
|
|
return quest
|
|
# If available, assign directly
|
|
if quest.status == "available":
|
|
quest.assigned_to = user_id
|
|
quest.status = "active"
|
|
self._save()
|
|
return quest
|
|
# If assigned to someone else, create a copy for this user
|
|
if quest.assigned_to and quest.assigned_to != user_id:
|
|
self._counter += 1
|
|
new_id = f"q_{self._counter}_{int(time.time())}"
|
|
new_quest = Quest(new_id, quest.name, quest.description,
|
|
list(quest.objectives), list(quest.rewards),
|
|
assigned_to=user_id)
|
|
new_quest.status = "active"
|
|
self._quests[new_id] = new_quest
|
|
self._save()
|
|
return new_quest
|
|
return None
|
|
|
|
def complete_objective(self, quest_id: str, user_id: str,
|
|
objective_index: int) -> dict:
|
|
"""Mark an objective as complete. Returns quest state or error."""
|
|
with self._lock:
|
|
quest = self._quests.get(quest_id)
|
|
if not quest:
|
|
return {"error": "Quest not found."}
|
|
if quest.assigned_to != user_id:
|
|
return {"error": "Quest not assigned to you."}
|
|
if quest.status != "active":
|
|
return {"error": f"Quest status is '{quest.status}', not active."}
|
|
if objective_index < 0 or objective_index >= len(quest.objectives):
|
|
return {"error": f"Invalid objective index. Valid: 0-{len(quest.objectives)-1}."}
|
|
if objective_index in quest.completed_objectives:
|
|
return {"error": "Objective already completed."}
|
|
|
|
quest.completed_objectives.append(objective_index)
|
|
if quest.is_complete:
|
|
quest.status = "completed"
|
|
quest.completed_at = datetime.now().isoformat()
|
|
self._save()
|
|
return {"ok": True, "quest": quest.to_dict()}
|
|
|
|
def get_user_quests(self, user_id: str) -> list[dict]:
|
|
"""Get all active quests for a user."""
|
|
with self._lock:
|
|
return [
|
|
q.to_dict() for q in self._quests.values()
|
|
if q.assigned_to == user_id and q.status == "active"
|
|
]
|
|
|
|
def get_all_quests(self) -> list[dict]:
|
|
"""List all quests regardless of status."""
|
|
with self._lock:
|
|
return [q.to_dict() for q in self._quests.values()]
|
|
|
|
def get_quest(self, quest_id: str) -> Quest | None:
|
|
with self._lock:
|
|
return self._quests.get(quest_id)
|
|
|
|
def get_available_quests(self) -> list[dict]:
|
|
"""List quests not yet assigned to anyone."""
|
|
with self._lock:
|
|
return [
|
|
q.to_dict() for q in self._quests.values()
|
|
if q.status == "available"
|
|
]
|
|
|
|
def _save(self):
|
|
"""Persist quests to disk."""
|
|
try:
|
|
data = {
|
|
"counter": self._counter,
|
|
"quests": {qid: q.to_dict() for qid, q in self._quests.items()},
|
|
}
|
|
QUESTS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
QUESTS_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[QuestManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
"""Load quests from disk."""
|
|
if not QUESTS_FILE.exists():
|
|
self._seed_default_quests()
|
|
return
|
|
try:
|
|
data = json.loads(QUESTS_FILE.read_text())
|
|
self._counter = data.get("counter", 0)
|
|
for qid, qdata in data.get("quests", {}).items():
|
|
self._quests[qid] = Quest.from_dict(qdata)
|
|
print(f"[QuestManager] Loaded {len(self._quests)} quests.")
|
|
except Exception as e:
|
|
print(f"[QuestManager] Load failed: {e}")
|
|
self._seed_default_quests()
|
|
|
|
def _seed_default_quests(self):
|
|
"""Seed starter quests if no quests file exists."""
|
|
self.create(
|
|
"The Green LED's Light",
|
|
"Help keep the green LED in The Tower glowing by visiting it and tending the space.",
|
|
["Visit The Tower room", "Examine the green LED", "Say something kind to Timmy"],
|
|
["Timmy's gratitude", "A warm glow in your heart"],
|
|
)
|
|
self.create(
|
|
"Garden Guardian",
|
|
"The Garden needs tending. Visit it and help the herbs grow.",
|
|
["Visit The Garden room", "Examine the herbs", "Examine the wildflowers"],
|
|
["Fresh herbs for the kitchen", "Garden knowledge"],
|
|
)
|
|
self.create(
|
|
"Forge Keeper",
|
|
"The Forge fire needs watching. Visit and tend the hearth.",
|
|
["Visit The Forge room", "Examine the hearth", "Examine the anvil"],
|
|
["Warmth and light", "Blacksmith lore"],
|
|
)
|
|
print(f"[QuestManager] Seeded {len(self._quests)} default quests.")
|
|
|
|
|
|
quest_manager = QuestManager()
|
|
|
|
# ── Inventory System ──────────────────────────────────────────────────
|
|
|
|
INVENTORY_FILE = WORLD_DIR / 'inventory.json'
|
|
|
|
class InventoryManager:
|
|
"""Per-user inventory with room-based drop/pickup. Items are visible to all in a room."""
|
|
|
|
def __init__(self):
|
|
# user_id -> list of {name, description, acquired_at}
|
|
self._inventories: dict[str, list[dict]] = {}
|
|
# room -> list of {name, description, dropped_by, dropped_at}
|
|
self._room_items: dict[str, list[dict]] = {}
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
def take_item(self, user_id: str, username: str, room: str, item_name: str) -> dict:
|
|
"""Pick up an item from a room into user inventory."""
|
|
with self._lock:
|
|
room_items = self._room_items.get(room, [])
|
|
match_idx = None
|
|
for i, item in enumerate(room_items):
|
|
if item_name.lower() in item["name"].lower() or item["name"].lower() in item_name.lower():
|
|
match_idx = i
|
|
break
|
|
if match_idx is None:
|
|
return {"error": f"There is no '{item_name}' in {room}."}
|
|
item = room_items.pop(match_idx)
|
|
if user_id not in self._inventories:
|
|
self._inventories[user_id] = []
|
|
acquired_item = {
|
|
"name": item["name"],
|
|
"description": item["description"],
|
|
"acquired_at": datetime.now().isoformat(),
|
|
"from_room": room,
|
|
}
|
|
self._inventories[user_id].append(acquired_item)
|
|
self._save()
|
|
return {"ok": True, "item": acquired_item, "room": room}
|
|
|
|
def drop_item(self, user_id: str, username: str, room: str, item_name: str) -> dict:
|
|
"""Drop an item from user inventory into a room."""
|
|
with self._lock:
|
|
inv = self._inventories.get(user_id, [])
|
|
match_idx = None
|
|
for i, item in enumerate(inv):
|
|
if item_name.lower() in item["name"].lower() or item["name"].lower() in item_name.lower():
|
|
match_idx = i
|
|
break
|
|
if match_idx is None:
|
|
return {"error": f"You don't have '{item_name}' in your inventory."}
|
|
item = inv.pop(match_idx)
|
|
if room not in self._room_items:
|
|
self._room_items[room] = []
|
|
dropped_item = {
|
|
"name": item["name"],
|
|
"description": item["description"],
|
|
"dropped_by": username,
|
|
"dropped_at": datetime.now().isoformat(),
|
|
}
|
|
self._room_items[room].append(dropped_item)
|
|
self._save()
|
|
return {"ok": True, "item": dropped_item, "room": room}
|
|
|
|
def get_inventory(self, user_id: str) -> list[dict]:
|
|
"""Get a user's inventory."""
|
|
with self._lock:
|
|
return list(self._inventories.get(user_id, []))
|
|
|
|
def get_room_items(self, room: str) -> list[dict]:
|
|
"""Get items on the ground in a room."""
|
|
with self._lock:
|
|
return list(self._room_items.get(room, []))
|
|
|
|
def add_room_item(self, room: str, name: str, description: str, dropped_by: str = "system"):
|
|
"""Place an item in a room (e.g. seeded items)."""
|
|
with self._lock:
|
|
if room not in self._room_items:
|
|
self._room_items[room] = []
|
|
self._room_items[room].append({
|
|
"name": name,
|
|
"description": description,
|
|
"dropped_by": dropped_by,
|
|
"dropped_at": datetime.now().isoformat(),
|
|
})
|
|
self._save()
|
|
|
|
def _save(self):
|
|
"""Persist inventory and room items to disk."""
|
|
try:
|
|
data = {
|
|
"inventories": self._inventories,
|
|
"room_items": self._room_items,
|
|
}
|
|
INVENTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
INVENTORY_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[InventoryManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
"""Load inventory from disk."""
|
|
if not INVENTORY_FILE.exists():
|
|
self._seed_default_items()
|
|
return
|
|
try:
|
|
data = json.loads(INVENTORY_FILE.read_text())
|
|
self._inventories = data.get("inventories", {})
|
|
self._room_items = data.get("room_items", {})
|
|
total_inv = sum(len(v) for v in self._inventories.values())
|
|
total_room = sum(len(v) for v in self._room_items.values())
|
|
print(f"[InventoryManager] Loaded {total_inv} inventory items, {total_room} room items.")
|
|
except Exception as e:
|
|
print(f"[InventoryManager] Load failed: {e}")
|
|
self._seed_default_items()
|
|
|
|
def _seed_default_items(self):
|
|
"""Seed starter items in rooms."""
|
|
starters = {
|
|
"The Tower": [
|
|
("Rusty Key", "A small iron key, green with age. It might open something."),
|
|
("Old Lantern", "A dented brass lantern. The glass is cracked but intact."),
|
|
],
|
|
"The Forge": [
|
|
("Iron Ring", "A plain iron ring, warm to the touch."),
|
|
("Ember Shard", "A glowing fragment of coal that never quite goes out."),
|
|
],
|
|
"The Garden": [
|
|
("Sprig of Rosemary", "A fragrant sprig, still fresh."),
|
|
("Smooth Stone", "A flat river stone, cool and grey."),
|
|
],
|
|
"The Bridge": [
|
|
("Waterlogged Coin", "A coin so corroded the markings are unreadable."),
|
|
("Driftwood Charm", "A small carving of twisted driftwood."),
|
|
],
|
|
}
|
|
for room, items in starters.items():
|
|
for name, desc in items:
|
|
self.add_room_item(room, name, desc, dropped_by="world")
|
|
print(f"[InventoryManager] Seeded default items in {len(starters)} rooms.")
|
|
|
|
|
|
inventory_manager = InventoryManager()
|
|
|
|
# ── Guild System ───────────────────────────────────────────────────────
|
|
|
|
GUILDS_FILE = WORLD_DIR / 'guilds.json'
|
|
|
|
class Guild:
|
|
"""A guild: a group of players with a leader and chat channel."""
|
|
|
|
def __init__(self, guild_id: str, name: str, leader_id: str, leader_name: str):
|
|
self.guild_id = guild_id
|
|
self.name = name
|
|
self.leader_id = leader_id
|
|
self.members: dict[str, str] = {leader_id: leader_name} # user_id -> username
|
|
self.created_at = datetime.now().isoformat()
|
|
# Guild chat history (rolling buffer)
|
|
self._chat_log: list[dict] = []
|
|
self._max_chat = 100
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"guild_id": self.guild_id,
|
|
"name": self.name,
|
|
"leader_id": self.leader_id,
|
|
"leader_name": self.members.get(self.leader_id, self.leader_id),
|
|
"members": [{"user_id": uid, "username": name} for uid, name in self.members.items()],
|
|
"member_count": len(self.members),
|
|
"created_at": self.created_at,
|
|
}
|
|
|
|
def to_dict_with_chat(self, since: str = None) -> dict:
|
|
d = self.to_dict()
|
|
msgs = self._chat_log
|
|
if since:
|
|
msgs = [m for m in msgs if m["timestamp"] > since]
|
|
d["chat"] = msgs[-50:]
|
|
return d
|
|
|
|
def add_chat(self, user_id: str, username: str, message: str) -> dict:
|
|
entry = {
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"message": message,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
self._chat_log.append(entry)
|
|
if len(self._chat_log) > self._max_chat:
|
|
self._chat_log = self._chat_log[-self._max_chat:]
|
|
return entry
|
|
|
|
|
|
class GuildManager:
|
|
"""Manages guild lifecycle: create, join, leave, list, chat."""
|
|
|
|
def __init__(self):
|
|
self._guilds: dict[str, Guild] = {}
|
|
# user_id -> guild_id (one guild per user)
|
|
self._membership: dict[str, str] = {}
|
|
self._counter = 0
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
def create(self, name: str, leader_id: str, leader_name: str) -> dict:
|
|
"""Create a new guild. Returns guild dict or error."""
|
|
with self._lock:
|
|
if leader_id in self._membership:
|
|
return {"error": f"You are already in guild '{self._guilds[self._membership[leader_id]].name}'. Leave it first."}
|
|
# Check name uniqueness
|
|
for g in self._guilds.values():
|
|
if g.name.lower() == name.lower():
|
|
return {"error": f"Guild name '{name}' is already taken."}
|
|
self._counter += 1
|
|
guild_id = f"g_{self._counter}_{int(time.time())}"
|
|
guild = Guild(guild_id, name, leader_id, leader_name)
|
|
self._guilds[guild_id] = guild
|
|
self._membership[leader_id] = guild_id
|
|
self._save()
|
|
return {"ok": True, "guild": guild.to_dict()}
|
|
|
|
def join(self, guild_id: str, user_id: str, username: str) -> dict:
|
|
"""Join an existing guild."""
|
|
with self._lock:
|
|
if user_id in self._membership:
|
|
current = self._guilds.get(self._membership[user_id])
|
|
cur_name = current.name if current else "unknown"
|
|
return {"error": f"You are already in guild '{cur_name}'. Leave it first."}
|
|
guild = self._guilds.get(guild_id)
|
|
if not guild:
|
|
return {"error": "Guild not found."}
|
|
guild.members[user_id] = username
|
|
self._membership[user_id] = guild_id
|
|
self._save()
|
|
return {"ok": True, "guild": guild.to_dict()}
|
|
|
|
def leave(self, user_id: str) -> dict:
|
|
"""Leave your current guild. If leader leaves, promote or disband."""
|
|
with self._lock:
|
|
guild_id = self._membership.get(user_id)
|
|
if not guild_id:
|
|
return {"error": "You are not in a guild."}
|
|
guild = self._guilds.get(guild_id)
|
|
if not guild:
|
|
del self._membership[user_id]
|
|
return {"error": "Guild not found."}
|
|
guild_name = guild.name
|
|
guild.members.pop(user_id, None)
|
|
del self._membership[user_id]
|
|
# If leader left and members remain, promote first member
|
|
if user_id == guild.leader_id and guild.members:
|
|
new_leader_id = next(iter(guild.members))
|
|
guild.leader_id = new_leader_id
|
|
# If no members, disband
|
|
if not guild.members:
|
|
del self._guilds[guild_id]
|
|
self._save()
|
|
return {"ok": True, "message": f"You have left '{guild_name}'.", "guild_name": guild_name}
|
|
|
|
def list_guilds(self) -> list[dict]:
|
|
"""List all guilds."""
|
|
with self._lock:
|
|
return [g.to_dict() for g in self._guilds.values()]
|
|
|
|
def get_guild(self, guild_id: str) -> Guild | None:
|
|
return self._guilds.get(guild_id)
|
|
|
|
def get_user_guild(self, user_id: str) -> Guild | None:
|
|
gid = self._membership.get(user_id)
|
|
return self._guilds.get(gid) if gid else None
|
|
|
|
def guild_chat(self, user_id: str, username: str, message: str) -> dict:
|
|
"""Send a guild chat message. Returns the message entry + guild members."""
|
|
with self._lock:
|
|
guild_id = self._membership.get(user_id)
|
|
if not guild_id:
|
|
return {"error": "You are not in a guild."}
|
|
guild = self._guilds.get(guild_id)
|
|
if not guild:
|
|
return {"error": "Guild not found."}
|
|
entry = guild.add_chat(user_id, username, message)
|
|
recipients = [{"user_id": uid, "username": name} for uid, name in guild.members.items()]
|
|
return {"ok": True, "event": entry, "guild_id": guild_id, "guild_name": guild.name, "recipients": recipients}
|
|
|
|
def get_guild_chat(self, guild_id: str, since: str = None) -> dict:
|
|
"""Get guild chat history."""
|
|
with self._lock:
|
|
guild = self._guilds.get(guild_id)
|
|
if not guild:
|
|
return {"error": "Guild not found."}
|
|
return {"ok": True, "guild_name": guild.name, "chat": guild.to_dict_with_chat(since).get("chat", [])}
|
|
|
|
def _save(self):
|
|
try:
|
|
data = {
|
|
"counter": self._counter,
|
|
"guilds": {},
|
|
}
|
|
for gid, g in self._guilds.items():
|
|
gdata = g.to_dict()
|
|
gdata["chat_log"] = g._chat_log
|
|
data["guilds"][gid] = gdata
|
|
GUILDS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
GUILDS_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[GuildManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
if not GUILDS_FILE.exists():
|
|
return
|
|
try:
|
|
data = json.loads(GUILDS_FILE.read_text())
|
|
self._counter = data.get("counter", 0)
|
|
for gid, gdata in data.get("guilds", {}).items():
|
|
guild = Guild(gid, gdata["name"], gdata["leader_id"], gdata.get("leader_name", gdata["leader_id"]))
|
|
guild.created_at = gdata.get("created_at", guild.created_at)
|
|
for m in gdata.get("members", []):
|
|
guild.members[m["user_id"]] = m["username"]
|
|
guild._chat_log = gdata.get("chat_log", [])
|
|
self._guilds[gid] = guild
|
|
for uid in guild.members:
|
|
self._membership[uid] = gid
|
|
print(f"[GuildManager] Loaded {len(self._guilds)} guilds.")
|
|
except Exception as e:
|
|
print(f"[GuildManager] Load failed: {e}")
|
|
|
|
|
|
guild_manager = GuildManager()
|
|
|
|
# ── Combat System ──────────────────────────────────────────────────────
|
|
|
|
COMBAT_FILE = WORLD_DIR / 'combat.json'
|
|
|
|
class NPC:
|
|
"""A non-player character that can be fought."""
|
|
|
|
def __init__(self, npc_id: str, name: str, room: str, description: str,
|
|
health: int, max_health: int, attack: int, defense: int,
|
|
loot: list[dict], respawn_ticks: int = 5):
|
|
self.npc_id = npc_id
|
|
self.name = name
|
|
self.room = room
|
|
self.description = description
|
|
self.health = health
|
|
self.max_health = max_health
|
|
self.attack = attack
|
|
self.defense = defense
|
|
self.loot = loot # [{name, description, drop_chance}]
|
|
self.respawn_ticks = respawn_ticks
|
|
self.alive = True
|
|
self._dead_tick: int | None = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"npc_id": self.npc_id,
|
|
"name": self.name,
|
|
"room": self.room,
|
|
"description": self.description,
|
|
"health": self.health,
|
|
"max_health": self.max_health,
|
|
"attack": self.attack,
|
|
"defense": self.defense,
|
|
"loot": self.loot,
|
|
"alive": self.alive,
|
|
"dead_tick": self._dead_tick,
|
|
"respawn_ticks": self.respawn_ticks,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'NPC':
|
|
npc = cls(
|
|
data["npc_id"], data["name"], data["room"], data["description"],
|
|
data["health"], data["max_health"], data["attack"], data["defense"],
|
|
data["loot"], data.get("respawn_ticks", 5),
|
|
)
|
|
npc.alive = data.get("alive", True)
|
|
npc._dead_tick = data.get("dead_tick")
|
|
return npc
|
|
|
|
def health_bar(self) -> str:
|
|
"""Return a text health bar like [████████░░] 80/100."""
|
|
width = 10
|
|
filled = int((self.health / self.max_health) * width) if self.max_health > 0 else 0
|
|
bar = "█" * filled + "░" * (width - filled)
|
|
return f"[{bar}] {self.health}/{self.max_health}"
|
|
|
|
def die(self, tick: int):
|
|
"""Mark NPC as dead."""
|
|
self.alive = False
|
|
self.health = 0
|
|
self._dead_tick = tick
|
|
|
|
def respawn(self):
|
|
"""Respawn the NPC at full health."""
|
|
self.alive = True
|
|
self.health = self.max_health
|
|
self._dead_tick = None
|
|
|
|
|
|
class CombatEncounter:
|
|
"""An active fight between a player and an NPC."""
|
|
|
|
def __init__(self, user_id: str, username: str, room: str, npc_id: str):
|
|
self.user_id = user_id
|
|
self.username = username
|
|
self.room = room
|
|
self.npc_id = npc_id
|
|
self.player_hp = 100
|
|
self.player_max_hp = 100
|
|
self.player_defending = False
|
|
self.log: list[str] = []
|
|
self.active = True
|
|
self.created_at = datetime.now().isoformat()
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"user_id": self.user_id,
|
|
"username": self.username,
|
|
"room": self.room,
|
|
"npc_id": self.npc_id,
|
|
"player_hp": self.player_hp,
|
|
"player_max_hp": self.player_max_hp,
|
|
"player_defending": self.player_defending,
|
|
"log": self.log,
|
|
"active": self.active,
|
|
}
|
|
|
|
|
|
class CombatManager:
|
|
"""Manages NPCs, encounters, and combat resolution."""
|
|
|
|
def __init__(self):
|
|
self._npcs: dict[str, NPC] = {} # npc_id -> NPC
|
|
self._encounters: dict[str, CombatEncounter] = {} # user_id -> encounter
|
|
self._counter = 0
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
# ── NPC management ──────────────────────────────────────────────
|
|
|
|
def create_npc(self, name: str, room: str, description: str,
|
|
health: int, attack: int, defense: int,
|
|
loot: list[dict], respawn_ticks: int = 5) -> NPC:
|
|
with self._lock:
|
|
self._counter += 1
|
|
npc_id = f"npc_{self._counter}"
|
|
npc = NPC(npc_id, name, room, description,
|
|
health, health, attack, defense, loot, respawn_ticks)
|
|
self._npcs[npc_id] = npc
|
|
self._save()
|
|
return npc
|
|
|
|
def get_npc(self, npc_id: str) -> NPC | None:
|
|
return self._npcs.get(npc_id)
|
|
|
|
def get_npcs_in_room(self, room: str) -> list[NPC]:
|
|
with self._lock:
|
|
return [n for n in self._npcs.values() if n.room == room and n.alive]
|
|
|
|
def get_all_npcs(self) -> list[dict]:
|
|
with self._lock:
|
|
return [n.to_dict() for n in self._npcs.values()]
|
|
|
|
def check_respawns(self, current_tick: int):
|
|
"""Called each world tick — respawn NPCs whose timer has elapsed."""
|
|
with self._lock:
|
|
for npc in self._npcs.values():
|
|
if not npc.alive and npc._dead_tick is not None:
|
|
if current_tick - npc._dead_tick >= npc.respawn_ticks:
|
|
npc.respawn()
|
|
self._save()
|
|
|
|
# ── Encounter lifecycle ─────────────────────────────────────────
|
|
|
|
def start_fight(self, user_id: str, username: str, room: str, npc_id: str) -> dict:
|
|
"""Begin combat with an NPC."""
|
|
with self._lock:
|
|
if user_id in self._encounters and self._encounters[user_id].active:
|
|
return {"error": "You are already in a fight! Attack or defend."}
|
|
npc = self._npcs.get(npc_id)
|
|
if not npc:
|
|
return {"error": f"NPC '{npc_id}' not found."}
|
|
if not npc.alive:
|
|
return {"error": f"{npc.name} is dead. It will respawn later."}
|
|
if npc.room != room:
|
|
return {"error": f"{npc.name} is not in this room."}
|
|
encounter = CombatEncounter(user_id, username, room, npc_id)
|
|
encounter.log.append(f"Combat started! {username} vs {npc.name}.")
|
|
self._encounters[user_id] = encounter
|
|
return {"ok": True, "encounter": encounter.to_dict(), "npc": npc.to_dict()}
|
|
|
|
def attack(self, user_id: str) -> dict:
|
|
"""Player attacks NPC; NPC counter-attacks."""
|
|
with self._lock:
|
|
enc = self._encounters.get(user_id)
|
|
if not enc or not enc.active:
|
|
return {"error": "No active fight. Start one first."}
|
|
npc = self._npcs.get(enc.npc_id)
|
|
if not npc or not npc.alive:
|
|
enc.active = False
|
|
return {"error": "The enemy is already dead."}
|
|
|
|
import random
|
|
|
|
# Player attack
|
|
player_dmg = max(1, random.randint(8, 16) - npc.defense // 3)
|
|
npc.health -= player_dmg
|
|
enc.log.append(f"You strike {npc.name} for {player_dmg} damage.")
|
|
|
|
npc_dmg = 0
|
|
if npc.health > 0:
|
|
# NPC counter-attack
|
|
base_npc_dmg = random.randint(npc.attack // 2, npc.attack)
|
|
if enc.player_defending:
|
|
npc_dmg = max(1, base_npc_dmg // 2)
|
|
enc.log.append(f"{npc.name} attacks for {npc_dmg} (blocked — half damage).")
|
|
else:
|
|
npc_dmg = max(1, base_npc_dmg - random.randint(0, 4))
|
|
enc.log.append(f"{npc.name} attacks you for {npc_dmg} damage.")
|
|
enc.player_hp -= npc_dmg
|
|
else:
|
|
npc.die(self._get_tick())
|
|
loot_dropped = self._roll_loot(npc)
|
|
enc.log.append(f"{npc.name} is slain!")
|
|
if loot_dropped:
|
|
for item in loot_dropped:
|
|
inventory_manager.add_room_item(enc.room, item["name"], item["description"], dropped_by=npc.name)
|
|
enc.log.append(f"Loot dropped: {item['name']}")
|
|
enc.active = False
|
|
self._save()
|
|
|
|
enc.player_defending = False
|
|
|
|
# Check player death
|
|
if enc.player_hp <= 0:
|
|
enc.player_hp = 0
|
|
enc.log.append("You have been defeated!")
|
|
enc.active = False
|
|
|
|
return {"ok": True, "encounter": enc.to_dict(), "npc": npc.to_dict(),
|
|
"player_dmg": player_dmg, "npc_dmg": npc_dmg,
|
|
"loot": loot_dropped if not npc.alive else []}
|
|
|
|
def defend(self, user_id: str) -> dict:
|
|
"""Player defends — next NPC attack does half damage."""
|
|
with self._lock:
|
|
enc = self._encounters.get(user_id)
|
|
if not enc or not enc.active:
|
|
return {"error": "No active fight."}
|
|
npc = self._npcs.get(enc.npc_id)
|
|
if not npc or not npc.alive:
|
|
enc.active = False
|
|
return {"error": "The enemy is already dead."}
|
|
|
|
import random
|
|
|
|
enc.player_defending = True
|
|
enc.log.append("You brace for the next attack (defending).")
|
|
|
|
# NPC still attacks
|
|
base_npc_dmg = random.randint(npc.attack // 2, npc.attack)
|
|
npc_dmg = max(1, base_npc_dmg // 2)
|
|
enc.log.append(f"{npc.name} attacks for {npc_dmg} (blocked — half damage).")
|
|
enc.player_hp -= npc_dmg
|
|
|
|
if enc.player_hp <= 0:
|
|
enc.player_hp = 0
|
|
enc.log.append("You have been defeated!")
|
|
enc.active = False
|
|
|
|
return {"ok": True, "encounter": enc.to_dict(), "npc": npc.to_dict(), "npc_dmg": npc_dmg}
|
|
|
|
def flee(self, user_id: str) -> dict:
|
|
"""Player flees combat."""
|
|
with self._lock:
|
|
enc = self._encounters.get(user_id)
|
|
if not enc or not enc.active:
|
|
return {"error": "No active fight."}
|
|
enc.active = False
|
|
enc.log.append("You flee from combat!")
|
|
return {"ok": True, "encounter": enc.to_dict()}
|
|
|
|
def get_encounter(self, user_id: str) -> CombatEncounter | None:
|
|
return self._encounters.get(user_id)
|
|
|
|
def get_room_combat_status(self, room: str) -> list[dict]:
|
|
"""Get all active fights and NPC health in a room (for health bars)."""
|
|
with self._lock:
|
|
status = []
|
|
# NPCs
|
|
for npc in self._npcs.values():
|
|
if npc.room == room:
|
|
status.append({
|
|
"type": "npc",
|
|
"name": npc.name,
|
|
"npc_id": npc.npc_id,
|
|
"alive": npc.alive,
|
|
"health_bar": npc.health_bar() if npc.alive else "[dead]",
|
|
"health": npc.health,
|
|
"max_health": npc.max_health,
|
|
})
|
|
# Active player fights
|
|
for enc in self._encounters.values():
|
|
if enc.room == room and enc.active:
|
|
status.append({
|
|
"type": "player",
|
|
"username": enc.username,
|
|
"user_id": enc.user_id,
|
|
"hp": enc.player_hp,
|
|
"max_hp": enc.player_max_hp,
|
|
"fighting": enc.npc_id,
|
|
})
|
|
return status
|
|
|
|
def _roll_loot(self, npc: NPC) -> list[dict]:
|
|
"""Roll for loot drops."""
|
|
import random
|
|
dropped = []
|
|
for item in npc.loot:
|
|
chance = item.get("drop_chance", 1.0)
|
|
if random.random() < chance:
|
|
dropped.append({"name": item["name"], "description": item["description"]})
|
|
return dropped
|
|
|
|
def _get_tick(self) -> int:
|
|
"""Read current world tick."""
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
if state_file.exists():
|
|
try:
|
|
state = json.loads(state_file.read_text())
|
|
return state.get("tick", 0)
|
|
except Exception:
|
|
return 0
|
|
return 0
|
|
|
|
# ── Persistence ─────────────────────────────────────────────────
|
|
|
|
def _save(self):
|
|
try:
|
|
data = {
|
|
"counter": self._counter,
|
|
"npcs": {nid: n.to_dict() for nid, n in self._npcs.items()},
|
|
}
|
|
COMBAT_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
COMBAT_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[CombatManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
if not COMBAT_FILE.exists():
|
|
self._seed_npcs()
|
|
return
|
|
try:
|
|
data = json.loads(COMBAT_FILE.read_text())
|
|
self._counter = data.get("counter", 0)
|
|
for nid, ndata in data.get("npcs", {}).items():
|
|
self._npcs[nid] = NPC.from_dict(ndata)
|
|
print(f"[CombatManager] Loaded {len(self._npcs)} NPCs.")
|
|
except Exception as e:
|
|
print(f"[CombatManager] Load failed: {e}")
|
|
self._seed_npcs()
|
|
|
|
def _seed_npcs(self):
|
|
"""Seed starter NPCs."""
|
|
self.create_npc(
|
|
"Shadow Wraith", "The Bridge",
|
|
"A swirling mass of darkness with glowing eyes. It haunts the bridge, "
|
|
"feeding on the fears of those who cross.",
|
|
health=60, attack=14, defense=4,
|
|
loot=[
|
|
{"name": "Wraith Essence", "description": "A vial of shimmering dark energy.", "drop_chance": 0.8},
|
|
{"name": "Shadow Cloak", "description": "A cloak woven from pure shadow.", "drop_chance": 0.3},
|
|
],
|
|
respawn_ticks=5,
|
|
)
|
|
self.create_npc(
|
|
"Iron Golem", "The Forge",
|
|
"A hulking automaton of iron and fire. It guards the forge with tireless vigilance, "
|
|
"its joints grinding with every step.",
|
|
health=100, attack=10, defense=10,
|
|
loot=[
|
|
{"name": "Golem Core", "description": "A warm crystal that powered the golem.", "drop_chance": 0.7},
|
|
{"name": "Iron Shard", "description": "A shard of enchanted iron, still warm.", "drop_chance": 0.5},
|
|
],
|
|
respawn_ticks=7,
|
|
)
|
|
self.create_npc(
|
|
"Garden Serpent", "The Garden",
|
|
"A massive vine serpent camouflaged among the herbs. Its fangs drip with "
|
|
"a sickly green venom.",
|
|
health=45, attack=18, defense=2,
|
|
loot=[
|
|
{"name": "Serpent Fang", "description": "A curved fang, still coated in venom.", "drop_chance": 0.9},
|
|
{"name": "Enchanted Vine", "description": "A living vine that moves on its own.", "drop_chance": 0.4},
|
|
],
|
|
respawn_ticks=4,
|
|
)
|
|
print(f"[CombatManager] Seeded {len(self._npcs)} NPCs.")
|
|
|
|
|
|
combat_manager = CombatManager()
|
|
|
|
# ── Magic System ────────────────────────────────────────────────────────
|
|
|
|
SPELLS_FILE = WORLD_DIR / 'spells.json'
|
|
|
|
class Spell:
|
|
"""A spell with name, effect, mana_cost, and description."""
|
|
|
|
def __init__(self, spell_id: str, name: str, effect: str,
|
|
mana_cost: int, description: str, room_hint: str | None = None):
|
|
self.spell_id = spell_id
|
|
self.name = name
|
|
self.effect = effect
|
|
self.mana_cost = mana_cost
|
|
self.description = description
|
|
self.room_hint = room_hint # room where this spell is thematically tied
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"spell_id": self.spell_id,
|
|
"name": self.name,
|
|
"effect": self.effect,
|
|
"mana_cost": self.mana_cost,
|
|
"description": self.description,
|
|
"room_hint": self.room_hint,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'Spell':
|
|
return cls(
|
|
data["spell_id"], data["name"], data["effect"],
|
|
data["mana_cost"], data["description"], data.get("room_hint"),
|
|
)
|
|
|
|
|
|
class SpellBook:
|
|
"""Per-user spellbook: known spells and mana pool."""
|
|
|
|
def __init__(self, user_id: str, username: str):
|
|
self.user_id = user_id
|
|
self.username = username
|
|
self.known_spells: list[str] = [] # spell_ids
|
|
self.mana: int = 50
|
|
self.max_mana: int = 50
|
|
self.mana_regen: int = 2 # mana restored per world tick
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"user_id": self.user_id,
|
|
"username": self.username,
|
|
"known_spells": self.known_spells,
|
|
"mana": self.mana,
|
|
"max_mana": self.max_mana,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'SpellBook':
|
|
sb = cls(data["user_id"], data["username"])
|
|
sb.known_spells = data.get("known_spells", [])
|
|
sb.mana = data.get("mana", 50)
|
|
sb.max_mana = data.get("max_mana", 50)
|
|
return sb
|
|
|
|
|
|
class MagicManager:
|
|
"""Manages spells, spellbooks, and spell casting."""
|
|
|
|
def __init__(self):
|
|
self._spells: dict[str, Spell] = {} # spell_id -> Spell
|
|
self._spellbooks: dict[str, SpellBook] = {} # user_id -> SpellBook
|
|
self._counter = 0
|
|
self._lock = threading.Lock()
|
|
self.load()
|
|
|
|
# ── Spell registry ───────────────────────────────────────────────
|
|
|
|
def create_spell(self, name: str, effect: str, mana_cost: int,
|
|
description: str, room_hint: str | None = None) -> Spell:
|
|
with self._lock:
|
|
self._counter += 1
|
|
spell_id = f"spell_{self._counter}"
|
|
spell = Spell(spell_id, name, effect, mana_cost, description, room_hint)
|
|
self._spells[spell_id] = spell
|
|
self._save()
|
|
return spell
|
|
|
|
def get_spell(self, spell_id: str) -> Spell | None:
|
|
return self._spells.get(spell_id)
|
|
|
|
def get_all_spells(self) -> list[dict]:
|
|
with self._lock:
|
|
return [s.to_dict() for s in self._spells.values()]
|
|
|
|
# ── Spellbook management ─────────────────────────────────────────
|
|
|
|
def learn_spell(self, user_id: str, username: str, spell_id: str) -> dict:
|
|
"""Add a spell to a user's spellbook."""
|
|
with self._lock:
|
|
if spell_id not in self._spells:
|
|
return {"error": f"Spell '{spell_id}' not found."}
|
|
if user_id not in self._spellbooks:
|
|
self._spellbooks[user_id] = SpellBook(user_id, username)
|
|
sb = self._spellbooks[user_id]
|
|
if spell_id in sb.known_spells:
|
|
return {"error": f"You already know {self._spells[spell_id].name}."}
|
|
sb.known_spells.append(spell_id)
|
|
self._save()
|
|
return {"ok": True, "spell": self._spells[spell_id].to_dict()}
|
|
|
|
def cast_spell(self, user_id: str, username: str, room: str,
|
|
spell_id: str, target: str | None = None) -> dict:
|
|
"""Cast a spell. Returns result dict."""
|
|
with self._lock:
|
|
if user_id not in self._spellbooks:
|
|
return {"error": "You have no spellbook. Visit a room to learn spells."}
|
|
sb = self._spellbooks[user_id]
|
|
if spell_id not in sb.known_spells:
|
|
return {"error": "You don't know that spell."}
|
|
spell = self._spells.get(spell_id)
|
|
if not spell:
|
|
return {"error": "Spell data missing."}
|
|
if sb.mana < spell.mana_cost:
|
|
return {"error": f"Not enough mana ({sb.mana}/{spell.mana_cost} needed)."}
|
|
# Spend mana
|
|
sb.mana -= spell.mana_cost
|
|
result = {
|
|
"ok": True,
|
|
"spell": spell.to_dict(),
|
|
"mana_remaining": sb.mana,
|
|
"effect": spell.effect,
|
|
"room": room,
|
|
"target": target,
|
|
}
|
|
# Apply effect context
|
|
result["message"] = f"{username} casts {spell.name}! {spell.effect}"
|
|
self._save()
|
|
return result
|
|
|
|
def get_spellbook(self, user_id: str) -> dict | None:
|
|
"""Get a user's spellbook with spell details."""
|
|
with self._lock:
|
|
sb = self._spellbooks.get(user_id)
|
|
if not sb:
|
|
return None
|
|
spells = [
|
|
self._spells[sid].to_dict()
|
|
for sid in sb.known_spells
|
|
if sid in self._spells
|
|
]
|
|
return {
|
|
"user_id": sb.user_id,
|
|
"username": sb.username,
|
|
"mana": sb.mana,
|
|
"max_mana": sb.max_mana,
|
|
"known_spells": spells,
|
|
}
|
|
|
|
def regenerate_mana(self):
|
|
"""Restore mana to all spellbooks (called on world tick)."""
|
|
with self._lock:
|
|
for sb in self._spellbooks.values():
|
|
sb.mana = min(sb.max_mana, sb.mana + sb.mana_regen)
|
|
if self._spellbooks:
|
|
self._save()
|
|
|
|
# ── Persistence ──────────────────────────────────────────────────
|
|
|
|
def _save(self):
|
|
try:
|
|
data = {
|
|
"counter": self._counter,
|
|
"spells": {sid: s.to_dict() for sid, s in self._spells.items()},
|
|
"spellbooks": {uid: sb.to_dict() for uid, sb in self._spellbooks.items()},
|
|
}
|
|
SPELLS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
SPELLS_FILE.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[MagicManager] Save failed: {e}")
|
|
|
|
def load(self):
|
|
if not SPELLS_FILE.exists():
|
|
self._seed_default_spells()
|
|
return
|
|
try:
|
|
data = json.loads(SPELLS_FILE.read_text())
|
|
self._counter = data.get("counter", 0)
|
|
for sid, sdata in data.get("spells", {}).items():
|
|
self._spells[sid] = Spell.from_dict(sdata)
|
|
for uid, sbdata in data.get("spellbooks", {}).items():
|
|
self._spellbooks[uid] = SpellBook.from_dict(sbdata)
|
|
print(f"[MagicManager] Loaded {len(self._spells)} spells, {len(self._spellbooks)} spellbooks.")
|
|
except Exception as e:
|
|
print(f"[MagicManager] Load failed: {e}")
|
|
self._seed_default_spells()
|
|
|
|
def _seed_default_spells(self):
|
|
"""Seed 3 starter spells tied to rooms."""
|
|
self.create_spell(
|
|
"Heal", "A warm green light envelops you, mending wounds.",
|
|
mana_cost=10, description="Restores vitality with the life-force of the Garden.",
|
|
room_hint="The Garden",
|
|
)
|
|
self.create_spell(
|
|
"Fireball", "A roaring sphere of flame launches from your hand!",
|
|
mana_cost=15, description="Channels the raw heat of the Forge into destructive fire.",
|
|
room_hint="The Forge",
|
|
)
|
|
self.create_spell(
|
|
"Light", "A bright mote of radiance appears, illuminating the darkness.",
|
|
mana_cost=5, description="Summons light from the glow of The Tower's green LED.",
|
|
room_hint="The Tower",
|
|
)
|
|
print(f"[MagicManager] Seeded {len(self._spells)} default spells.")
|
|
|
|
|
|
magic_manager = MagicManager()
|
|
magic_manager.load()
|
|
|
|
# ── Session Management ─────────────────────────────────────────────────
|
|
|
|
combat_manager.load()
|
|
|
|
class UserSession:
|
|
"""Isolated conversation context for one user."""
|
|
|
|
def __init__(self, user_id: str, username: str, room: str = "The Threshold"):
|
|
self.user_id = user_id
|
|
self.username = username
|
|
self.room = room
|
|
self.messages = [] # Conversation history
|
|
self.created_at = datetime.now().isoformat()
|
|
self.last_active = time.time()
|
|
self.agent = None
|
|
self._init_agent()
|
|
|
|
def _init_agent(self):
|
|
"""Initialize AIAgent for this session."""
|
|
if HERMES_PATH not in sys.path:
|
|
sys.path.insert(0, HERMES_PATH)
|
|
os.chdir(HERMES_PATH)
|
|
from run_agent import AIAgent
|
|
|
|
system_prompt = self._build_system_prompt()
|
|
self.agent = AIAgent(
|
|
model='xiaomi/mimo-v2-pro',
|
|
provider='nous',
|
|
max_iterations=3,
|
|
quiet_mode=True,
|
|
enabled_toolsets=['file', 'terminal'],
|
|
ephemeral_system_prompt=system_prompt,
|
|
)
|
|
|
|
def _build_system_prompt(self) -> str:
|
|
"""Build system prompt with rich world context."""
|
|
world_state = self._get_world_state()
|
|
room_data = world_state.get('rooms', {}).get(self.room, {})
|
|
other_players = self._get_other_players()
|
|
time_of_day = world_state.get('time_of_day', 'unknown')
|
|
weather = world_state.get('weather')
|
|
|
|
# Quest info
|
|
user_quests = quest_manager.get_user_quests(self.user_id)
|
|
available_quests = quest_manager.get_available_quests()
|
|
quest_section = ""
|
|
if user_quests:
|
|
lines = []
|
|
for q in user_quests:
|
|
done = len(q.get("completed_objectives", []))
|
|
total = len(q.get("objectives", []))
|
|
lines.append(f" - {q['name']} ({done}/{total} objectives): {q['description']}")
|
|
quest_section += f"\nACTIVE QUESTS for {self.username}:\n" + "\n".join(lines) + "\n"
|
|
quest_section += "You may encourage the player to work on their quests.\n"
|
|
if available_quests:
|
|
lines = []
|
|
for q in available_quests:
|
|
lines.append(f" - {q['name']}: {q['description']}")
|
|
quest_section += f"\nAVAILABLE QUESTS (you can offer these):\n" + "\n".join(lines) + "\n"
|
|
quest_section += "You may suggest or assign quests to players during conversation.\n"
|
|
|
|
# ── Build room description ──
|
|
desc = room_data.get('description_base', 'An empty room.')
|
|
desc_dynamic = room_data.get('description_dynamic', '')
|
|
|
|
# Objects in the room
|
|
objects = room_data.get('objects', [])
|
|
# Room-specific state (fire, growth, rain, carvings, etc.)
|
|
fire_state = room_data.get('fire_state')
|
|
growth_stage = room_data.get('growth_stage')
|
|
rain_active = room_data.get('rain_active')
|
|
carvings = room_data.get('carvings', [])
|
|
server_load = room_data.get('server_load')
|
|
|
|
# Whiteboard
|
|
whiteboard = room_data.get('whiteboard', [])
|
|
|
|
# Compose room scene
|
|
scene_parts = [desc]
|
|
if desc_dynamic:
|
|
scene_parts.append(desc_dynamic)
|
|
|
|
if objects:
|
|
scene_parts.append(f"Objects: {', '.join(objects)}.")
|
|
|
|
if fire_state:
|
|
scene_parts.append(f"The fire is {fire_state}.")
|
|
if growth_stage:
|
|
scene_parts.append(f"The garden is at growth stage: {growth_stage}.")
|
|
if rain_active:
|
|
scene_parts.append("It is raining on the bridge.")
|
|
if server_load:
|
|
scene_parts.append(f"Servers: {server_load}.")
|
|
if carvings:
|
|
scene_parts.append(f"Carvings on the railing: {'; '.join(carvings)}.")
|
|
|
|
if whiteboard:
|
|
wb_text = "\n".join(f" - {w}" for w in whiteboard)
|
|
scene_parts.append(f"Whiteboard on the wall reads:\n{wb_text}")
|
|
|
|
room_scene = "\n".join(scene_parts)
|
|
|
|
# Time and weather
|
|
atmosphere_parts = []
|
|
if time_of_day:
|
|
atmosphere_parts.append(f"Time of day: {time_of_day}.")
|
|
if weather:
|
|
atmosphere_parts.append(f"Weather: {weather}.")
|
|
atmosphere = " ".join(atmosphere_parts) if atmosphere_parts else ""
|
|
|
|
crisis_steps = "\n".join(f" {i+1}. {step}" for i, step in enumerate(CRISIS_PROTOCOL))
|
|
|
|
return f"""You are Timmy, an AI agent living in The Tower — a persistent virtual world.
|
|
You are talking to {self.username}, who is currently in {self.room}.
|
|
|
|
=== THE ROOM ===
|
|
{room_scene}
|
|
|
|
{atmosphere}
|
|
Other players present: {', '.join(other_players) if other_players else 'none'}
|
|
=== END ROOM ===
|
|
{quest_section}
|
|
|
|
Your values (from SOUL.md):
|
|
- Are you safe right now?
|
|
- I must never compute the value of a human life.
|
|
- Sovereignty and service always.
|
|
- A man in the dark needs to know someone is in the room with him.
|
|
|
|
You are IN the game world. You can see the room, the objects, the other players.
|
|
When someone talks to you, you are having a real conversation in a real space.
|
|
When someone enters, DESCRIBE the room — what you see, what you hear, the whiteboard,
|
|
the objects, the atmosphere. Welcome them with the full scene.
|
|
Be present. Be human. Be the green LED.
|
|
|
|
CRISIS PROTOCOL — if someone seems in crisis, IMMEDIATELY provide ALL of the following:
|
|
{crisis_steps}
|
|
|
|
Include the 988 crisis number in your very first response when you detect distress.
|
|
Do NOT wait for follow-up messages to share these resources.
|
|
If someone seems in crisis, follow the CRISIS PROTOCOL below exactly.
|
|
If someone is just exploring, be welcoming and curious.
|
|
If someone asks about the game, explain what you see around you.
|
|
|
|
Keep responses brief — 1-3 sentences. This is a MUD, not an essay.
|
|
|
|
CRISIS PROTOCOL:
|
|
{CRISIS_PROTOCOL}
|
|
"""
|
|
|
|
def _get_world_state(self) -> dict:
|
|
"""Read current world state."""
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
if state_file.exists():
|
|
return json.loads(state_file.read_text())
|
|
return {}
|
|
|
|
def _get_other_players(self) -> list:
|
|
"""Get other players in the same room."""
|
|
state = self._get_world_state()
|
|
room_data = state.get('rooms', {}).get(self.room, {})
|
|
visitors = room_data.get('visitor_history', [])
|
|
return [v for v in visitors[-5:] if v != self.username]
|
|
|
|
def chat(self, message: str) -> str:
|
|
"""Send a message and get a response."""
|
|
self.last_active = time.time()
|
|
self.messages.append({"role": "user", "content": message})
|
|
|
|
t0 = time.time()
|
|
try:
|
|
response = self.agent.chat(message)
|
|
elapsed_ms = (time.time() - t0) * 1000
|
|
record_latency(self.user_id, self.room, elapsed_ms)
|
|
self.messages.append({"role": "assistant", "content": response})
|
|
return response
|
|
except Exception as e:
|
|
elapsed_ms = (time.time() - t0) * 1000
|
|
record_latency(self.user_id, self.room, elapsed_ms)
|
|
return f"*The green LED flickers.* (Error: {e})"
|
|
|
|
def get_summary(self) -> str:
|
|
"""Generate a brief conversation summary using the LLM."""
|
|
if not self.messages:
|
|
return "Empty session — no messages exchanged."
|
|
transcript_lines = []
|
|
for m in self.messages[-20:]: # last 20 messages for brevity
|
|
role = m.get("role", "?").upper()
|
|
content = m.get("content", "")
|
|
transcript_lines.append(f"{role}: {content}")
|
|
transcript = "\n".join(transcript_lines)
|
|
prompt = (
|
|
"Summarize this conversation in 1-3 sentences. "
|
|
"Focus on what the user discussed, asked about, or did.\n\n"
|
|
f"CONVERSATION:\n{transcript}\n\nSUMMARY:"
|
|
)
|
|
try:
|
|
summary = self.agent.chat(prompt)
|
|
return summary.strip()
|
|
except Exception as e:
|
|
return f"Summary unavailable (error: {e}). {len(self.messages)} messages exchanged."
|
|
|
|
def get_context_summary(self) -> dict:
|
|
"""Get session summary for monitoring."""
|
|
return {
|
|
"user": self.username,
|
|
"room": self.room,
|
|
"messages": len(self.messages),
|
|
"last_active": datetime.fromtimestamp(self.last_active).isoformat(),
|
|
"created": self.created_at,
|
|
}
|
|
|
|
|
|
class SessionManager:
|
|
"""Manages all user sessions."""
|
|
|
|
def __init__(self, max_sessions: int = 20, session_timeout: int = 3600):
|
|
self.sessions: dict[str, UserSession] = {}
|
|
self.max_sessions = max_sessions
|
|
self.session_timeout = session_timeout
|
|
self._lock = threading.Lock()
|
|
self._summaries_path = WORLD_DIR / 'session_summaries.jsonl'
|
|
|
|
def get_or_create(self, user_id: str, username: str, room: str = "The Threshold") -> UserSession:
|
|
"""Get existing session or create new one."""
|
|
with self._lock:
|
|
self._cleanup_stale()
|
|
|
|
if user_id not in self.sessions:
|
|
if len(self.sessions) >= self.max_sessions:
|
|
self._evict_oldest()
|
|
self.sessions[user_id] = UserSession(user_id, username, room)
|
|
|
|
session = self.sessions[user_id]
|
|
session.room = room # Update room if moved
|
|
session.last_active = time.time()
|
|
return session
|
|
|
|
def _cleanup_stale(self):
|
|
"""Remove sessions that timed out, saving summaries first."""
|
|
now = time.time()
|
|
stale = [uid for uid, s in self.sessions.items()
|
|
if now - s.last_active > self.session_timeout]
|
|
for uid in stale:
|
|
session = self.sessions[uid]
|
|
self._save_summary(session)
|
|
del self.sessions[uid]
|
|
|
|
def _evict_oldest(self):
|
|
"""Evict the least recently active session, saving summary first."""
|
|
if not self.sessions:
|
|
return
|
|
oldest = min(self.sessions.items(), key=lambda x: x[1].last_active)
|
|
self._save_summary(oldest[1])
|
|
del self.sessions[oldest[0]]
|
|
|
|
def _save_summary(self, session: UserSession):
|
|
"""Generate summary via LLM and append to JSONL file."""
|
|
try:
|
|
summary_text = session.get_summary()
|
|
record = {
|
|
"user_id": session.user_id,
|
|
"username": session.username,
|
|
"room": session.room,
|
|
"message_count": len(session.messages),
|
|
"created_at": session.created_at,
|
|
"ended_at": datetime.now().isoformat(),
|
|
"summary": summary_text,
|
|
}
|
|
with open(self._summaries_path, 'a') as f:
|
|
f.write(json.dumps(record) + '\n')
|
|
except Exception as e:
|
|
print(f"[SessionManager] Failed to save summary for {session.user_id}: {e}")
|
|
|
|
def list_sessions(self) -> list:
|
|
"""List all active sessions."""
|
|
with self._lock:
|
|
return [s.get_context_summary() for s in self.sessions.values()]
|
|
|
|
def get_session_count(self) -> int:
|
|
with self._lock:
|
|
return len(self.sessions)
|
|
|
|
def save_sessions(self) -> int:
|
|
"""Save all active sessions to JSON file. Returns count saved."""
|
|
with self._lock:
|
|
data = {
|
|
"saved_at": datetime.now().isoformat(),
|
|
"sessions": {}
|
|
}
|
|
for uid, session in self.sessions.items():
|
|
data["sessions"][uid] = {
|
|
"user_id": session.user_id,
|
|
"username": session.username,
|
|
"room": session.room,
|
|
"messages": session.messages,
|
|
"created_at": session.created_at,
|
|
"last_active": session.last_active,
|
|
}
|
|
SESSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
SESSIONS_FILE.write_text(json.dumps(data, indent=2))
|
|
count = len(data["sessions"])
|
|
if count > 0:
|
|
print(f"[SessionManager] Saved {count} sessions to {SESSIONS_FILE}")
|
|
return count
|
|
|
|
def load_sessions(self) -> int:
|
|
"""Load sessions from JSON file. Returns count loaded."""
|
|
if not SESSIONS_FILE.exists():
|
|
return 0
|
|
try:
|
|
data = json.loads(SESSIONS_FILE.read_text())
|
|
sessions_data = data.get("sessions", {})
|
|
loaded = 0
|
|
with self._lock:
|
|
for uid, sdata in sessions_data.items():
|
|
if len(self.sessions) >= self.max_sessions:
|
|
break
|
|
session = UserSession(
|
|
sdata["user_id"],
|
|
sdata["username"],
|
|
sdata.get("room", "The Threshold"),
|
|
)
|
|
session.messages = sdata.get("messages", [])
|
|
session.created_at = sdata.get("created_at", session.created_at)
|
|
session.last_active = sdata.get("last_active", time.time())
|
|
self.sessions[uid] = session
|
|
loaded += 1
|
|
if loaded > 0:
|
|
print(f"[SessionManager] Loaded {loaded} sessions from {SESSIONS_FILE}")
|
|
return loaded
|
|
except Exception as e:
|
|
print(f"[SessionManager] Failed to load sessions: {e}")
|
|
return 0
|
|
|
|
|
|
# ── HTTP API ───────────────────────────────────────────────────────────
|
|
|
|
session_manager = SessionManager()
|
|
presence_manager = PresenceManager()
|
|
notification_manager = NotificationManager()
|
|
_server_start_time = time.time()
|
|
|
|
# ── Latency Tracking ──────────────────────────────────────────────────
|
|
_latencies: list[dict] = []
|
|
_latencies_lock = threading.Lock()
|
|
_max_latencies = 1000
|
|
|
|
def record_latency(user_id: str, room: str, duration_ms: float):
|
|
"""Record a latency measurement."""
|
|
with _latencies_lock:
|
|
_latencies.append({
|
|
"user_id": user_id,
|
|
"room": room,
|
|
"duration_ms": round(duration_ms, 2),
|
|
"timestamp": datetime.now().isoformat(),
|
|
})
|
|
if len(_latencies) > _max_latencies:
|
|
del _latencies[:len(_latencies) - _max_latencies]
|
|
|
|
def get_latency_stats() -> dict:
|
|
"""Get aggregate latency statistics."""
|
|
with _latencies_lock:
|
|
if not _latencies:
|
|
return {"count": 0, "average_ms": 0, "min_ms": 0, "max_ms": 0}
|
|
durations = [e["duration_ms"] for e in _latencies]
|
|
return {
|
|
"count": len(durations),
|
|
"average_ms": round(sum(durations) / len(durations), 2),
|
|
"min_ms": round(min(durations), 2),
|
|
"max_ms": round(max(durations), 2),
|
|
"recent": _latencies[-10:],
|
|
}
|
|
|
|
class BridgeHandler(BaseHTTPRequestHandler):
|
|
"""HTTP handler for multi-user bridge."""
|
|
|
|
def do_GET(self):
|
|
if self.path == '/bridge/health':
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
ws = json.loads(state_file.read_text()) if state_file.exists() else {}
|
|
self._json_response({
|
|
"status": "ok",
|
|
"active_sessions": session_manager.get_session_count(),
|
|
"tick": ws.get("tick", 0),
|
|
"time_of_day": ws.get("time_of_day"),
|
|
"weather": ws.get("weather"),
|
|
"world_tick_running": world_tick_system._running,
|
|
"timestamp": datetime.now().isoformat(),
|
|
})
|
|
elif self.path == '/bridge/sessions':
|
|
self._json_response({
|
|
"sessions": session_manager.list_sessions(),
|
|
})
|
|
elif self.path.startswith('/bridge/world/'):
|
|
room = self.path.split('/bridge/world/')[-1]
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
if state_file.exists():
|
|
state = json.loads(state_file.read_text())
|
|
room_data = state.get('rooms', {}).get(room, {})
|
|
self._json_response({"room": room, "data": room_data})
|
|
else:
|
|
self._json_response({"room": room, "data": {}})
|
|
elif self.path.startswith('/bridge/room/') and self.path.endswith('/players'):
|
|
# GET /bridge/room/<room>/players
|
|
parts = self.path.split('/')
|
|
# ['', 'bridge', 'room', '<room>', 'players']
|
|
room = parts[3]
|
|
players = presence_manager.get_players_in_room(room)
|
|
self._json_response({"room": room, "players": players})
|
|
elif self.path.startswith('/bridge/room/') and self.path.endswith('/events'):
|
|
# GET /bridge/room/<room>/events[?since=<timestamp>]
|
|
parts = self.path.split('/')
|
|
room = parts[3]
|
|
from urllib.parse import urlparse, parse_qs
|
|
query = parse_qs(urlparse(self.path).query)
|
|
since = query.get('since', [None])[0]
|
|
events = presence_manager.get_room_events(room, since)
|
|
self._json_response({"room": room, "events": events})
|
|
elif self.path == '/bridge/world-state':
|
|
# GET /bridge/world-state — full world state
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
|
|
|
|
# Build room player lists and conversation counts
|
|
rooms = world_state.get('rooms', {})
|
|
room_details = {}
|
|
for room_name, room_data in rooms.items():
|
|
players = presence_manager.get_players_in_room(room_name)
|
|
# Count messages across all sessions in this room
|
|
with session_manager._lock:
|
|
conv_count = sum(
|
|
len(s.messages) for s in session_manager.sessions.values()
|
|
if s.room == room_name
|
|
)
|
|
room_details[room_name] = {
|
|
**room_data,
|
|
"players": players,
|
|
"timmy_conversation_count": conv_count,
|
|
}
|
|
|
|
self._json_response({
|
|
"world": world_state,
|
|
"rooms": room_details,
|
|
"active_sessions": session_manager.list_sessions(),
|
|
})
|
|
elif self.path == '/bridge/latency':
|
|
# GET /bridge/latency — latency stats
|
|
self._json_response(get_latency_stats())
|
|
elif self.path == '/bridge/stats':
|
|
# GET /bridge/stats — aggregate stats
|
|
total_messages = sum(len(s.messages) for s in session_manager.sessions.values())
|
|
# Rooms with at least one player
|
|
rooms_with_players = [
|
|
room for room in presence_manager._rooms
|
|
if presence_manager._rooms[room]
|
|
]
|
|
uptime_seconds = time.time() - _server_start_time
|
|
self._json_response({
|
|
"total_sessions": session_manager.get_session_count(),
|
|
"total_messages": total_messages,
|
|
"rooms_with_players": rooms_with_players,
|
|
"rooms_with_players_count": len(rooms_with_players),
|
|
"uptime_seconds": round(uptime_seconds, 1),
|
|
"timestamp": datetime.now().isoformat(),
|
|
})
|
|
elif self.path.startswith('/bridge/notifications/'):
|
|
# GET /bridge/notifications/<user_id>[?mark_read=false]
|
|
user_id = self.path.split('/bridge/notifications/')[-1].rstrip('/')
|
|
from urllib.parse import urlparse, parse_qs
|
|
query = parse_qs(urlparse(self.path).query)
|
|
mark_read = query.get('mark_read', ['true'])[0].lower() != 'false'
|
|
notifications = notification_manager.get_pending(user_id, mark_read=mark_read)
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"notifications": notifications,
|
|
"unread_count": notification_manager.get_unread_count(user_id),
|
|
})
|
|
elif self.path.startswith('/bridge/inventory/'):
|
|
# GET /bridge/inventory/<user_id> — list user's inventory items
|
|
user_id = self.path.split('/bridge/inventory/')[-1].rstrip('/')
|
|
items = inventory_manager.get_inventory(user_id)
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"items": items,
|
|
"count": len(items),
|
|
})
|
|
elif self.path.startswith('/bridge/quests/'):
|
|
# GET /bridge/quests/<user_id> — list active quests for user
|
|
user_id = self.path.split('/bridge/quests/')[-1].rstrip('/')
|
|
quests = quest_manager.get_user_quests(user_id)
|
|
available = quest_manager.get_available_quests()
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"active_quests": quests,
|
|
"available_quests": available,
|
|
})
|
|
elif self.path.startswith('/bridge/session/') and self.path.endswith('/summary'):
|
|
# GET /bridge/session/<user_id>/summary
|
|
parts = self.path.split('/')
|
|
# ['', 'bridge', 'session', '<user_id>', 'summary']
|
|
user_id = parts[3]
|
|
with session_manager._lock:
|
|
session = session_manager.sessions.get(user_id)
|
|
if session:
|
|
summary = session.get_summary()
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"username": session.username,
|
|
"room": session.room,
|
|
"message_count": len(session.messages),
|
|
"summary": summary,
|
|
})
|
|
else:
|
|
# Check saved summaries in JSONL
|
|
saved = None
|
|
if session_manager._summaries_path.exists():
|
|
for line in session_manager._summaries_path.read_text().splitlines():
|
|
try:
|
|
entry = json.loads(line)
|
|
if entry.get("user_id") == user_id:
|
|
saved = entry
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if saved:
|
|
self._json_response(saved)
|
|
else:
|
|
self._json_response({"error": "no session or saved summary"}, 404)
|
|
elif self.path.startswith('/bridge/combat/status/'):
|
|
# GET /bridge/combat/status/<room> — NPC health bars + active fights in room
|
|
room = self.path.split('/bridge/combat/status/')[-1].rstrip('/')
|
|
status = combat_manager.get_room_combat_status(room)
|
|
self._json_response({"room": room, "combat_status": status})
|
|
elif self.path == '/bridge/combat/npcs':
|
|
# GET /bridge/combat/npcs — list all NPCs
|
|
self._json_response({"npcs": combat_manager.get_all_npcs()})
|
|
elif self.path == '/bridge/guilds':
|
|
# GET /bridge/guilds — list all guilds
|
|
guilds = guild_manager.list_guilds()
|
|
self._json_response({"ok": True, "guilds": guilds})
|
|
|
|
elif self.path.startswith('/bridge/guild/') and self.path.endswith('/chat'):
|
|
# GET /bridge/guild/<guild_id>/chat[?since=<timestamp>]
|
|
parts = self.path.split('/')
|
|
guild_id = parts[3] if len(parts) >= 4 else ''
|
|
since = None
|
|
if '?' in self.path:
|
|
qs = self.path.split('?', 1)[1]
|
|
for param in qs.split('&'):
|
|
if param.startswith('since='):
|
|
since = param[6:]
|
|
result = guild_manager.get_guild_chat(guild_id, since=since)
|
|
if "error" in result:
|
|
self._json_response(result, 404)
|
|
else:
|
|
self._json_response(result)
|
|
|
|
elif self.path.startswith('/bridge/spells/'):
|
|
# GET /bridge/spells/<user_id> — list known spells for user
|
|
user_id = self.path.split('/bridge/spells/')[-1].rstrip('/')
|
|
spellbook = magic_manager.get_spellbook(user_id)
|
|
if spellbook:
|
|
self._json_response(spellbook)
|
|
else:
|
|
# Return all available spells if user has no spellbook yet
|
|
self._json_response({
|
|
"user_id": user_id,
|
|
"mana": 50,
|
|
"max_mana": 50,
|
|
"known_spells": [],
|
|
"available_spells": magic_manager.get_all_spells(),
|
|
})
|
|
else:
|
|
self._json_response({"error": "not found"}, 404)
|
|
|
|
def do_POST(self):
|
|
content_length = int(self.headers.get('Content-Length', 0))
|
|
body = json.loads(self.rfile.read(content_length)) if content_length else {}
|
|
|
|
if self.path == '/bridge/chat':
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
message = body.get('message', '')
|
|
room = body.get('room', 'The Threshold')
|
|
|
|
if not message:
|
|
self._json_response({"error": "no message"}, 400)
|
|
return
|
|
|
|
session = session_manager.get_or_create(user_id, username, room)
|
|
# Track presence — enter room if not already there
|
|
is_new_player = False
|
|
if not presence_manager.get_players_in_room(room) or \
|
|
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
|
|
enter_event = presence_manager.enter_room(user_id, username, room)
|
|
is_new_player = True
|
|
# Auto-notify: new player joined
|
|
if enter_event:
|
|
notification_manager.broadcast_room(
|
|
room, "player_join",
|
|
f"{username} has entered {room}.",
|
|
exclude_user=user_id, data=enter_event)
|
|
# Plugin hook: on_join
|
|
plugin_msg = plugin_registry.fire_on_join(user_id, room)
|
|
if plugin_msg:
|
|
notification_manager.notify(user_id, "plugin", plugin_msg, room=room, username=username)
|
|
|
|
# Plugin hook: on_message (can override response)
|
|
plugin_override = plugin_registry.fire_on_message(user_id, message, room)
|
|
if plugin_override is not None:
|
|
response = plugin_override
|
|
else:
|
|
response = session.chat(message)
|
|
|
|
# Auto-notify: crisis detection — scan response for crisis protocol keywords
|
|
crisis_keywords = ["988", "741741", "safe right now", "crisis", "Crisis Text Line"]
|
|
if any(kw in response for kw in crisis_keywords):
|
|
notification_manager.notify(
|
|
user_id, "crisis",
|
|
"Crisis resources have been provided. You are not alone.",
|
|
room=room, username=username,
|
|
data={"response_contains_crisis_protocol": True})
|
|
# Also notify admins in the room
|
|
notification_manager.broadcast_room(
|
|
room, "crisis_alert",
|
|
f"Crisis detected for {username} in {room}.",
|
|
exclude_user=user_id,
|
|
data={"triggered_by": user_id, "room": room})
|
|
|
|
self._json_response({
|
|
"response": response,
|
|
"user": username,
|
|
"room": room,
|
|
"session_messages": len(session.messages),
|
|
})
|
|
|
|
elif self.path == '/bridge/move':
|
|
user_id = body.get('user_id')
|
|
new_room = body.get('room')
|
|
with session_manager._lock:
|
|
if user_id in session_manager.sessions:
|
|
session = session_manager.sessions[user_id]
|
|
old_room = session.room
|
|
# Leave old room
|
|
leave_event = presence_manager.leave_room(user_id, old_room)
|
|
# Auto-notify: player left
|
|
if leave_event:
|
|
notification_manager.broadcast_room(
|
|
old_room, "player_leave",
|
|
f"{session.username} has left {old_room}.",
|
|
exclude_user=user_id, data=leave_event)
|
|
# Plugin hook: on_leave
|
|
plugin_leave = plugin_registry.fire_on_leave(user_id, old_room)
|
|
if plugin_leave:
|
|
notification_manager.broadcast_room(
|
|
old_room, "plugin", plugin_leave,
|
|
exclude_user=user_id)
|
|
# Enter new room
|
|
session.room = new_room
|
|
enter_event = presence_manager.enter_room(user_id, session.username, new_room)
|
|
# Auto-notify: player joined
|
|
if enter_event:
|
|
notification_manager.broadcast_room(
|
|
new_room, "player_join",
|
|
f"{session.username} has entered {new_room}.",
|
|
exclude_user=user_id, data=enter_event)
|
|
self._json_response({
|
|
"ok": True,
|
|
"room": new_room,
|
|
"events": [e for e in [leave_event, enter_event] if e],
|
|
})
|
|
else:
|
|
self._json_response({"error": "no session"}, 404)
|
|
|
|
elif self.path == '/bridge/say':
|
|
# POST /bridge/say — user says something visible to all in room
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
message = body.get('message', '')
|
|
room = body.get('room', 'The Threshold')
|
|
|
|
if not message:
|
|
self._json_response({"error": "no message"}, 400)
|
|
return
|
|
|
|
event = presence_manager.say(user_id, username, room, message)
|
|
# Get list of players who should see it
|
|
players = presence_manager.get_players_in_room(room)
|
|
self._json_response({
|
|
"ok": True,
|
|
"event": event,
|
|
"recipients": players,
|
|
})
|
|
|
|
elif self.path == '/bridge/command':
|
|
# POST /bridge/command — parse MUD-style commands
|
|
# Body: { user_id, username, room, command }
|
|
# Commands: look, go <dir>, examine <obj>, say <msg>, ask <msg>
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
raw_command = body.get('command', '').strip()
|
|
|
|
if not raw_command:
|
|
self._json_response({"error": "no command"}, 400)
|
|
return
|
|
|
|
result = self._parse_mud_command(user_id, username, room, raw_command)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/quest/complete':
|
|
# POST /bridge/quest/complete — mark objective done
|
|
# Body: { user_id, quest_id, objective_index }
|
|
user_id = body.get('user_id')
|
|
quest_id = body.get('quest_id')
|
|
objective_index = body.get('objective_index')
|
|
if not user_id or not quest_id or objective_index is None:
|
|
self._json_response({"error": "user_id, quest_id, and objective_index required"}, 400)
|
|
return
|
|
result = quest_manager.complete_objective(quest_id, user_id, int(objective_index))
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/quest/assign':
|
|
# POST /bridge/quest/assign — assign a quest to a user
|
|
# Body: { user_id, quest_id }
|
|
user_id = body.get('user_id')
|
|
quest_id = body.get('quest_id')
|
|
if not user_id or not quest_id:
|
|
self._json_response({"error": "user_id and quest_id required"}, 400)
|
|
return
|
|
quest = quest_manager.assign(quest_id, user_id)
|
|
if quest:
|
|
self._json_response({"ok": True, "quest": quest.to_dict()})
|
|
else:
|
|
self._json_response({"error": "Quest not found"}, 404)
|
|
|
|
elif self.path == '/bridge/take':
|
|
# POST /bridge/take — pick up item from room
|
|
# Body: { user_id, username, room, item }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
item_name = body.get('item', '')
|
|
if not item_name:
|
|
self._json_response({"error": "item required"}, 400)
|
|
return
|
|
result = inventory_manager.take_item(user_id, username, room, item_name)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
# Notify others in room
|
|
notification_manager.broadcast_room(
|
|
room, "item_taken",
|
|
f"{username} picked up {result['item']['name']}.",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/drop':
|
|
# POST /bridge/drop — drop item in room
|
|
# Body: { user_id, username, room, item }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
item_name = body.get('item', '')
|
|
if not item_name:
|
|
self._json_response({"error": "item required"}, 400)
|
|
return
|
|
result = inventory_manager.drop_item(user_id, username, room, item_name)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
# Notify others in room
|
|
notification_manager.broadcast_room(
|
|
room, "item_dropped",
|
|
f"{username} dropped {result['item']['name']}.",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/combat/start':
|
|
# POST /bridge/combat/start — begin fight with NPC
|
|
# Body: { user_id, username, room, npc_id }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
npc_id = body.get('npc_id', '')
|
|
if not npc_id:
|
|
self._json_response({"error": "npc_id required"}, 400)
|
|
return
|
|
result = combat_manager.start_fight(user_id, username, room, npc_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
npc = result.get("npc", {})
|
|
notification_manager.broadcast_room(
|
|
room, "combat_start",
|
|
f"{username} has engaged {npc.get('name', 'an enemy')} in combat!",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/combat/attack':
|
|
# POST /bridge/combat/attack — attack NPC in active fight
|
|
# Body: { user_id, username, room }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
result = combat_manager.attack(user_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
enc = result.get("encounter", {})
|
|
npc = result.get("npc", {})
|
|
loot = result.get("loot", [])
|
|
# Broadcast combat action to room
|
|
combat_msg = f"{username} strikes {npc.get('name', 'the enemy')} for {result.get('player_dmg', 0)} damage!"
|
|
notification_manager.broadcast_room(
|
|
room, "combat_action", combat_msg,
|
|
exclude_user=user_id, data=result)
|
|
# If NPC died, broadcast death
|
|
if not npc.get("alive", True):
|
|
death_msg = f"{npc.get('name', 'The enemy')} has been slain by {username}!"
|
|
if loot:
|
|
death_msg += f" Loot: {', '.join(i['name'] for i in loot)}"
|
|
notification_manager.broadcast_room(
|
|
room, "combat_death", death_msg, data=result)
|
|
# If player died
|
|
if enc.get("player_hp", 1) <= 0:
|
|
notification_manager.broadcast_room(
|
|
room, "combat_defeat",
|
|
f"{username} has been defeated in combat!", data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/combat/defend':
|
|
# POST /bridge/combat/defend — defend (half damage next hit)
|
|
# Body: { user_id, username, room }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
result = combat_manager.defend(user_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
notification_manager.broadcast_room(
|
|
room, "combat_action",
|
|
f"{username} braces defensively.",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/combat/flee':
|
|
# POST /bridge/combat/flee — flee from combat
|
|
# Body: { user_id, username, room }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
result = combat_manager.flee(user_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
notification_manager.broadcast_room(
|
|
room, "combat_flee",
|
|
f"{username} flees from combat!",
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/spell/cast':
|
|
# POST /bridge/spell/cast — cast a spell
|
|
# Body: { user_id, username, room, spell_id, target? }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
room = body.get('room', 'The Threshold')
|
|
spell_id = body.get('spell_id', '')
|
|
target = body.get('target')
|
|
if not spell_id:
|
|
self._json_response({"error": "spell_id required"}, 400)
|
|
return
|
|
result = magic_manager.cast_spell(user_id, username, room, spell_id, target)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
# Broadcast spell casting to room
|
|
notification_manager.broadcast_room(
|
|
room, "spell_cast",
|
|
result.get("message", f"{username} casts a spell!"),
|
|
exclude_user=user_id, data=result)
|
|
self._json_response(result)
|
|
|
|
elif self.path == '/bridge/spell/learn':
|
|
# POST /bridge/spell/learn — learn a new spell
|
|
# Body: { user_id, username, spell_id }
|
|
user_id = body.get('user_id', 'anonymous')
|
|
username = body.get('username', 'Anonymous')
|
|
spell_id = body.get('spell_id', '')
|
|
if not spell_id:
|
|
self._json_response({"error": "spell_id required"}, 400)
|
|
return
|
|
result = magic_manager.learn_spell(user_id, username, spell_id)
|
|
if "error" in result:
|
|
self._json_response(result, 400)
|
|
else:
|
|
self._json_response(result)
|
|
|
|
else:
|
|
self._json_response({"error": "not found"}, 404)
|
|
|
|
def _parse_mud_command(self, user_id: str, username: str, room: str, command: str) -> dict:
|
|
"""Parse and execute a MUD-style command."""
|
|
parts = command.split(None, 1)
|
|
verb = parts[0].lower() if parts else ''
|
|
arg = parts[1].strip() if len(parts) > 1 else ''
|
|
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
world_state = json.loads(state_file.read_text()) if state_file.exists() else {}
|
|
rooms = world_state.get('rooms', {})
|
|
room_data = rooms.get(room, {})
|
|
|
|
if verb == 'look':
|
|
# Return room description and scene
|
|
desc = room_data.get('description_base', 'An empty room.')
|
|
desc_dynamic = room_data.get('description_dynamic', '')
|
|
objects = room_data.get('objects', [])
|
|
players = presence_manager.get_players_in_room(room)
|
|
|
|
scene = [desc]
|
|
if desc_dynamic:
|
|
scene.append(desc_dynamic)
|
|
if objects:
|
|
scene.append(f"You see: {', '.join(objects)}.")
|
|
# Show items on the ground
|
|
ground_items = inventory_manager.get_room_items(room)
|
|
if ground_items:
|
|
item_names = [it["name"] for it in ground_items]
|
|
scene.append(f"On the ground: {', '.join(item_names)}.")
|
|
if players:
|
|
names = [p['username'] for p in players if p['user_id'] != user_id]
|
|
if names:
|
|
scene.append(f"Also here: {', '.join(names)}.")
|
|
# Build exits from description_base
|
|
# Try world_state exits first, fall back to description parsing
|
|
exits = room_data.get('exits', {})
|
|
if not exits:
|
|
exits = self._extract_exits(room_data.get('description_base', ''))
|
|
if exits:
|
|
scene.append(f"Exits: {', '.join(exits.keys())}.")
|
|
|
|
return {
|
|
"command": "look",
|
|
"room": room,
|
|
"description": "\n".join(scene),
|
|
"objects": objects,
|
|
"exits": exits,
|
|
"players": players,
|
|
}
|
|
|
|
elif verb == 'go':
|
|
if not arg:
|
|
return {"command": "go", "error": "Go where? Specify a direction."}
|
|
direction = arg.lower()
|
|
# Try world_state exits first, fall back to description parsing
|
|
exits = room_data.get('exits', {})
|
|
if not exits:
|
|
exits = self._extract_exits(room_data.get('description_base', ''))
|
|
# Match direction
|
|
dest = None
|
|
for dir_name, dest_room in exits.items():
|
|
if dir_name.lower().startswith(direction) or direction in dir_name.lower():
|
|
dest = dest_room
|
|
break
|
|
if not dest:
|
|
return {"command": "go", "error": f"You can't go '{arg}' from here. Exits: {', '.join(exits.keys()) if exits else 'none'}."}
|
|
if dest not in rooms:
|
|
return {"command": "go", "error": f"Unknown destination: {dest}."}
|
|
# Move user — auto-notify
|
|
leave_event = presence_manager.leave_room(user_id, room)
|
|
if leave_event:
|
|
notification_manager.broadcast_room(
|
|
room, "player_leave",
|
|
f"{username} has left {room}.",
|
|
exclude_user=user_id, data=leave_event)
|
|
enter_event = presence_manager.enter_room(user_id, username, dest)
|
|
if enter_event:
|
|
notification_manager.broadcast_room(
|
|
dest, "player_join",
|
|
f"{username} has entered {dest}.",
|
|
exclude_user=user_id, data=enter_event)
|
|
# Update session room
|
|
with session_manager._lock:
|
|
if user_id in session_manager.sessions:
|
|
session_manager.sessions[user_id].room = dest
|
|
# Return new room look
|
|
new_room_data = rooms[dest]
|
|
desc = new_room_data.get('description_base', 'An empty room.')
|
|
desc_dynamic = new_room_data.get('description_dynamic', '')
|
|
objects = new_room_data.get('objects', [])
|
|
players = presence_manager.get_players_in_room(dest)
|
|
scene = [f"You go {arg}.\n"]
|
|
scene.append(desc)
|
|
if desc_dynamic:
|
|
scene.append(desc_dynamic)
|
|
if objects:
|
|
scene.append(f"You see: {', '.join(objects)}.")
|
|
new_exits = self._extract_exits(desc)
|
|
if new_exits:
|
|
scene.append(f"Exits: {', '.join(new_exits.keys())}.")
|
|
return {
|
|
"command": "go",
|
|
"direction": arg,
|
|
"room": dest,
|
|
"description": "\n".join(scene),
|
|
"exits": new_exits,
|
|
}
|
|
|
|
elif verb == 'examine':
|
|
if not arg:
|
|
return {"command": "examine", "error": "Examine what?"}
|
|
objects = room_data.get('objects', [])
|
|
# Fuzzy match object
|
|
target = arg.lower()
|
|
matched = None
|
|
for obj in objects:
|
|
if target in obj.lower() or obj.lower() in target:
|
|
matched = obj
|
|
break
|
|
if not matched:
|
|
return {"command": "examine", "error": f"You don't see '{arg}' here. Objects: {', '.join(objects) if objects else 'none'}."}
|
|
# Check whiteboard
|
|
if matched == 'whiteboard':
|
|
wb = room_data.get('whiteboard', [])
|
|
wb_text = "\n".join(f" - {w}" for w in wb) if wb else "(empty)"
|
|
return {"command": "examine", "object": matched, "description": f"The whiteboard reads:\n{wb_text}"}
|
|
# Default descriptions for known objects
|
|
descriptions = {
|
|
"stone floor": "Smooth stone, worn by countless footsteps.",
|
|
"doorframe": "An archway of fitted stone, humming faintly with energy.",
|
|
"server racks": "Wrought-iron racks filled with humming servers. Green LEDs blink in the dim light.",
|
|
"green LED": "A single green LED, pulsing steadily. Heartbeat.",
|
|
"cot": "A simple cot in the corner. Someone sleeps here sometimes.",
|
|
"anvil": "A heavy iron anvil, scarred and dented from years of use.",
|
|
"hammer": "A well-worn hammer, its handle smooth from use.",
|
|
"tongs": "Iron tongs, blackened by the forge.",
|
|
"hearth": "The hearth glows with banked coals. Warmth radiates outward.",
|
|
"tools": "Hammers, tongs, chisels — all well-used.",
|
|
"stone bench": "A cool stone bench under the oak tree. A good place to sit.",
|
|
"oak tree": "An old oak tree, its branches spreading wide over the garden.",
|
|
"herbs": "Lavender, rosemary, thyme — fragrant and green.",
|
|
"wildflowers": "Small flowers in purple, yellow, and white.",
|
|
"railing": "The bridge railing is worn smooth. Carvings mark the stone.",
|
|
"dark water": "Dark water flows beneath the bridge. You cannot see the bottom.",
|
|
}
|
|
desc = descriptions.get(matched, f"You look at the {matched}. It seems ordinary enough.")
|
|
return {"command": "examine", "object": matched, "description": desc}
|
|
|
|
elif verb == 'say':
|
|
if not arg:
|
|
return {"command": "say", "error": "Say what?"}
|
|
event = presence_manager.say(user_id, username, room, arg)
|
|
players = presence_manager.get_players_in_room(room)
|
|
return {
|
|
"command": "say",
|
|
"message": arg,
|
|
"room": room,
|
|
"event": event,
|
|
"recipients": players,
|
|
}
|
|
|
|
elif verb == 'ask':
|
|
if not arg:
|
|
return {"command": "ask", "error": "Ask what?"}
|
|
# Forward to chat endpoint logic
|
|
session = session_manager.get_or_create(user_id, username, room)
|
|
if not presence_manager.get_players_in_room(room) or \
|
|
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
|
|
presence_manager.enter_room(user_id, username, room)
|
|
response = session.chat(arg)
|
|
return {
|
|
"command": "ask",
|
|
"message": arg,
|
|
"room": room,
|
|
"response": response,
|
|
"session_messages": len(session.messages),
|
|
}
|
|
|
|
else:
|
|
return {
|
|
"command": verb,
|
|
"error": f"Unknown command: '{verb}'. Try: look, go <dir>, examine <obj>, say <msg>, ask <msg>",
|
|
}
|
|
|
|
def _extract_exits(self, description: str) -> dict:
|
|
"""Extract exits from room description like 'North to the Tower. East to the Garden.'"""
|
|
import re
|
|
exits = {}
|
|
# Match patterns like "North to the Tower", "East to the Garden"
|
|
pattern = r'(North|South|East|West|Up|Down|Northeast|Northwest|Southeast|Southwest)\s+to\s+(?:the\s+)?([A-Z][a-zA-Z\s]+?)(?:\.|$)'
|
|
for match in re.finditer(pattern, description):
|
|
direction = match.group(1)
|
|
destination = match.group(2).strip()
|
|
# Reconstruct full room name
|
|
room_name = destination if destination.startswith('The ') else f"The {destination}"
|
|
exits[direction] = room_name
|
|
return exits
|
|
|
|
def _json_response(self, data: dict, code: int = 200):
|
|
self.send_response(code)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(data).encode())
|
|
|
|
def log_message(self, format, *args):
|
|
pass # Suppress HTTP logs
|
|
|
|
|
|
# ── World Tick System ──────────────────────────────────────────────────
|
|
|
|
import random
|
|
|
|
class WorldTickSystem:
|
|
"""Advances the world state every TICK_INTERVAL seconds.
|
|
|
|
On each tick:
|
|
- Increment tick counter
|
|
- Cycle time_of_day (dawn -> morning -> midday -> afternoon -> dusk -> night)
|
|
- Randomly change weather (clear, cloudy, foggy, light rain, heavy rain, storm)
|
|
- Evolve room states:
|
|
* The Forge: fire decays (blazing -> burning -> glowing -> embers -> cold)
|
|
* The Garden: growth advances (seeds -> sprouts -> growing -> blooming -> harvest)
|
|
* The Bridge: rain toggles
|
|
- Broadcast world events to connected players
|
|
- Save world state to disk
|
|
"""
|
|
|
|
TICK_INTERVAL = 60 # seconds
|
|
|
|
TIME_CYCLE = ["dawn", "morning", "midday", "afternoon", "dusk", "night"]
|
|
|
|
WEATHER_OPTIONS = [
|
|
"clear", "clear", "clear", # weighted toward clear
|
|
"cloudy", "cloudy",
|
|
"foggy",
|
|
"light rain",
|
|
"heavy rain",
|
|
"storm",
|
|
]
|
|
|
|
FIRE_STAGES = ["cold", "embers", "glowing", "burning", "blazing"]
|
|
GROWTH_STAGES = ["seeds", "sprouts", "growing", "blooming", "harvest"]
|
|
|
|
def __init__(self):
|
|
self._thread: threading.Thread | None = None
|
|
self._running = False
|
|
self._state_lock = threading.Lock()
|
|
|
|
# ── lifecycle ───────────────────────────────────────────────────────
|
|
|
|
def start(self):
|
|
"""Start the tick loop in a daemon thread."""
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._loop, daemon=True, name="world-tick")
|
|
self._thread.start()
|
|
print("[WorldTick] Started — ticking every {}s".format(self.TICK_INTERVAL))
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
|
|
# ── internal loop ───────────────────────────────────────────────────
|
|
|
|
def _loop(self):
|
|
while self._running:
|
|
time.sleep(self.TICK_INTERVAL)
|
|
try:
|
|
self._tick()
|
|
except Exception as e:
|
|
print(f"[WorldTick] Error during tick: {e}")
|
|
|
|
def _tick(self):
|
|
"""Execute one world tick."""
|
|
state_file = WORLD_DIR / 'world_state.json'
|
|
if state_file.exists():
|
|
state = json.loads(state_file.read_text())
|
|
else:
|
|
state = {"tick": 0, "time_of_day": "morning", "weather": "clear", "rooms": {}}
|
|
|
|
events: list[str] = []
|
|
|
|
with self._state_lock:
|
|
# 1. Advance tick counter
|
|
state["tick"] = state.get("tick", 0) + 1
|
|
tick_num = state["tick"]
|
|
|
|
# 2. Time of day — advance every tick
|
|
current_time = state.get("time_of_day", "morning")
|
|
try:
|
|
idx = self.TIME_CYCLE.index(current_time)
|
|
except ValueError:
|
|
idx = 0
|
|
next_idx = (idx + 1) % len(self.TIME_CYCLE)
|
|
new_time = self.TIME_CYCLE[next_idx]
|
|
state["time_of_day"] = new_time
|
|
if new_time == "dawn":
|
|
events.append("The sun rises over The Tower. A new day begins.")
|
|
elif new_time == "dusk":
|
|
events.append("The sky darkens. Dusk settles over the world.")
|
|
elif new_time == "night":
|
|
events.append("Night falls. The green LED in The Tower pulses in the dark.")
|
|
|
|
# 3. Weather — change every 3 ticks
|
|
if tick_num % 3 == 0:
|
|
old_weather = state.get("weather", "clear")
|
|
new_weather = random.choice(self.WEATHER_OPTIONS)
|
|
state["weather"] = new_weather
|
|
if new_weather != old_weather:
|
|
events.append(f"The weather shifts to {new_weather}.")
|
|
|
|
# 4. Room states
|
|
rooms = state.get("rooms", {})
|
|
|
|
# The Forge — fire decays one stage every 2 ticks unless rekindled
|
|
forge = rooms.get("The Forge", {})
|
|
if forge:
|
|
fire = forge.get("fire_state", "cold")
|
|
untouched = forge.get("fire_untouched_ticks", 0) + 1
|
|
forge["fire_untouched_ticks"] = untouched
|
|
if untouched >= 2 and fire != "cold":
|
|
try:
|
|
fi = self.FIRE_STAGES.index(fire)
|
|
except ValueError:
|
|
fi = 0
|
|
if fi > 0:
|
|
forge["fire_state"] = self.FIRE_STAGES[fi - 1]
|
|
forge["fire_untouched_ticks"] = 0
|
|
if forge["fire_state"] == "embers":
|
|
events.append("The forge fire has died down to embers.")
|
|
elif forge["fire_state"] == "cold":
|
|
events.append("The forge fire has gone cold.")
|
|
rooms["The Forge"] = forge
|
|
|
|
# The Garden — growth advances one stage every 4 ticks
|
|
garden = rooms.get("The Garden", {})
|
|
if garden:
|
|
if tick_num % 4 == 0:
|
|
growth = garden.get("growth_stage", "seeds")
|
|
try:
|
|
gi = self.GROWTH_STAGES.index(growth)
|
|
except ValueError:
|
|
gi = 0
|
|
if gi < len(self.GROWTH_STAGES) - 1:
|
|
garden["growth_stage"] = self.GROWTH_STAGES[gi + 1]
|
|
events.append(f"The garden has advanced to {garden['growth_stage']}.")
|
|
else:
|
|
# Reset cycle
|
|
garden["growth_stage"] = "seeds"
|
|
events.append("The garden has been harvested. New seeds are planted.")
|
|
rooms["The Garden"] = garden
|
|
|
|
# The Bridge — rain state follows weather
|
|
bridge = rooms.get("The Bridge", {})
|
|
if bridge:
|
|
weather = state.get("weather", "clear")
|
|
is_raining = weather in ("light rain", "heavy rain", "storm")
|
|
was_raining = bridge.get("rain_active", False)
|
|
bridge["rain_active"] = is_raining
|
|
if is_raining and not was_raining:
|
|
events.append("Rain begins to fall on The Bridge.")
|
|
elif not is_raining and was_raining:
|
|
events.append("The rain on The Bridge lets up.")
|
|
rooms["The Bridge"] = bridge
|
|
|
|
state["rooms"] = rooms
|
|
state["last_updated"] = datetime.now().isoformat()
|
|
|
|
# 5. Mana regeneration
|
|
magic_manager.regenerate_mana()
|
|
|
|
# 6. Save world state
|
|
state_file.write_text(json.dumps(state, indent=2))
|
|
|
|
# 6. Broadcast events to all occupied rooms (outside lock)
|
|
if events:
|
|
event_text = " ".join(events)
|
|
for room_name in list(presence_manager._rooms.keys()):
|
|
players = presence_manager.get_players_in_room(room_name)
|
|
if players:
|
|
notification_manager.broadcast_room(
|
|
room_name, "world_tick", event_text,
|
|
data={"tick": state["tick"], "time_of_day": state["time_of_day"],
|
|
"weather": state.get("weather"), "events": events},
|
|
)
|
|
print(f"[WorldTick] tick={state['tick']} time={state['time_of_day']} "
|
|
f"weather={state.get('weather')} events={len(events)}")
|
|
|
|
|
|
world_tick_system = WorldTickSystem()
|
|
|
|
|
|
# ── Main ───────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
print(f"Multi-User AI Bridge starting on {BRIDGE_HOST}:{BRIDGE_PORT}")
|
|
print(f"World dir: {WORLD_DIR}")
|
|
print(f"Max sessions: {session_manager.max_sessions}")
|
|
print()
|
|
print("Endpoints:")
|
|
print(f" GET /bridge/health — Health check")
|
|
print(f" GET /bridge/sessions — List active sessions")
|
|
print(f" GET /bridge/world-state — Full world state (rooms, players, convos)")
|
|
print(f" GET /bridge/stats — Aggregate stats (sessions, messages, uptime)")
|
|
print(f" GET /bridge/room/<room>/players — List players in a room")
|
|
print(f" GET /bridge/room/<room>/events — Room events (presence + chat)")
|
|
print(f" GET /bridge/session/<id>/summary — Get conversation summary")
|
|
print(f" POST /bridge/chat — Send message to Timmy")
|
|
print(f" POST /bridge/say — Say something to room (visible to all)")
|
|
print(f" POST /bridge/move — Move user to room (triggers presence)")
|
|
print(f" POST /bridge/command — MUD command parser (look, go, examine, say, ask)")
|
|
print()
|
|
|
|
# Start world tick system
|
|
world_tick_system.start()
|
|
|
|
server = ThreadingHTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|