Files
timmy-home/morrowind/mcp_server.py

452 lines
15 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Morrowind MCP Server Timmy's game interface.
Exposes Morrowind as tools via MCP (stdio transport).
Perception comes from the OpenMW log file.
Actions go via CGEvent keypresses to the game window.
Register in ~/.hermes/config.yaml:
mcp_servers:
morrowind:
command: "python3"
args: ["/Users/apayne/.timmy/morrowind/mcp_server.py"]
"""
import json
import os
import re
import sys
import time
import subprocess
# MCP SDK
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# ═══════════════════════════════════════
# CONFIG
# ═══════════════════════════════════════
OPENMW_LOG = os.path.expanduser("~/Library/Preferences/openmw/openmw.log")
SCREENSHOT_DIR = os.path.expanduser("~/.timmy/morrowind/screenshots")
OPENMW_BIN = "/Applications/OpenMW.app/Contents/MacOS/openmw"
OPENMW_SAVES_DIR = os.path.expanduser("~/Library/Application Support/openmw/saves")
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# CGEvent key codes
KEYCODES = {
"w": 13, "a": 0, "s": 1, "d": 2,
"space": 49, "escape": 53, "return": 36,
"e": 14, "r": 15, "t": 17, "f": 3, "q": 12,
"tab": 48, "1": 18, "2": 19, "3": 20, "4": 21,
"5": 23, "6": 22, "7": 26, "8": 28, "9": 25,
"f5": 96, "f9": 101, # quicksave / quickload
"backtick": 50, # console
"j": 38, # journal
"up": 126, "down": 125, "left": 123, "right": 124,
}
# ═══════════════════════════════════════
# PERCEPTION — Parse OpenMW log
# ═══════════════════════════════════════
def parse_latest_perception():
"""Parse the most recent perception block from the OpenMW log."""
try:
with open(OPENMW_LOG, "r") as f:
content = f.read()
except FileNotFoundError:
return {"error": "OpenMW log not found. Is the game running?"}
# Find all perception blocks
blocks = re.findall(
r"=== TIMMY PERCEPTION ===\n(.*?)(?:=== END PERCEPTION ===)",
content, re.DOTALL
)
if not blocks:
return {"error": "No perception data in log. Game may not be running or Lua scripts not loaded."}
# Parse latest block
block = blocks[-1]
state = {
"npcs": [],
"doors": [],
"items": [],
}
for line in block.strip().split("\n"):
line = line.strip()
# Strip Lua log prefix if present
if "]:\t" in line:
line = line.split("]:\t", 1)[1]
if line.startswith("Cell:"):
state["cell"] = line.split(":", 1)[1].strip()
elif line.startswith("Pos:"):
state["position"] = line.split(":", 1)[1].strip()
elif line.startswith("Yaw:"):
state["yaw"] = line.split(":", 1)[1].strip()
elif line.startswith("HP:"):
state["health"] = line.split(":", 1)[1].strip()
elif line.startswith("MP:"):
state["magicka"] = line.split(":", 1)[1].strip()
elif line.startswith("FT:"):
state["fatigue"] = line.split(":", 1)[1].strip()
elif line.startswith("Mode:"):
state["mode"] = line.split(":", 1)[1].strip()
elif line.startswith("Time:"):
state["game_time"] = line.split(":", 1)[1].strip()
elif line.startswith("NPC:"):
state["npcs"].append(line[4:].strip())
elif line.startswith("Door:"):
state["doors"].append(line[5:].strip())
elif line.startswith("Item:"):
state["items"].append(line[5:].strip())
return state
def get_game_status():
"""Check if OpenMW is running."""
result = subprocess.run(["pgrep", "-f", "openmw"], capture_output=True, text=True)
running = result.returncode == 0
return {
"running": running,
"pid": result.stdout.strip().split("\n")[0] if running else None,
}
def focus_openmw():
"""Bring OpenMW to the front if it's running."""
subprocess.run(
[
"osascript",
"-e",
'tell application "System Events" to set frontmost of process "openmw" to true',
],
capture_output=True,
text=True,
)
time.sleep(0.5)
def find_savegame(selector=None):
"""Find a savegame path.
selector:
- None / "latest" => newest .omwsave under the OpenMW saves tree
- absolute path => use directly if it exists
- other string => case-insensitive basename substring match, newest first
"""
if selector and os.path.isabs(selector) and os.path.exists(selector):
return selector
candidates = []
for root, _dirs, files in os.walk(OPENMW_SAVES_DIR):
for fname in files:
if fname.endswith(".omwsave"):
path = os.path.join(root, fname)
try:
mtime = os.path.getmtime(path)
except OSError:
continue
candidates.append((mtime, path))
if not candidates:
return None
candidates.sort(reverse=True)
if not selector or str(selector).strip().lower() == "latest":
return candidates[0][1]
selector_lower = str(selector).strip().lower()
for _mtime, path in candidates:
if selector_lower in os.path.basename(path).lower():
return path
return None
def launch_openmw(save=None, new_game=False, wait_for_pid=15.0):
"""Launch OpenMW directly into a save or new game sequence."""
status = get_game_status()
if status["running"]:
focus_openmw()
return {
"launched": False,
"already_running": True,
"pid": status["pid"],
"save": None,
"new_game": False,
"message": "OpenMW is already running; focused existing game window.",
}
if not os.path.exists(OPENMW_BIN):
return {
"error": f"OpenMW binary not found: {OPENMW_BIN}"
}
cmd = [OPENMW_BIN]
save_path = None
if new_game:
cmd.extend(["--skip-menu", "--new-game"])
else:
save_path = find_savegame(save)
if not save_path:
return {
"error": "No savegame found. Provide a save selector/path or create a save first."
}
cmd.extend(["--skip-menu", "--load-savegame", save_path])
proc = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
deadline = time.time() + max(1.0, float(wait_for_pid))
pid = None
while time.time() < deadline:
status = get_game_status()
if status["running"]:
pid = status["pid"]
break
time.sleep(0.5)
if pid:
focus_openmw()
return {
"launched": True,
"already_running": False,
"pid": pid or str(proc.pid),
"save": save_path,
"new_game": bool(new_game),
"message": "OpenMW launched with --skip-menu. If loading a save, wait a few seconds before moving.",
}
# ═══════════════════════════════════════
# ACTIONS — CGEvent keypresses
# ═══════════════════════════════════════
def send_key(keycode, duration=0.0, shift=False):
"""Send a keypress to the game via CGEvent."""
import Quartz
flags = Quartz.kCGEventFlagMaskShift if shift else 0
down = Quartz.CGEventCreateKeyboardEvent(None, keycode, True)
Quartz.CGEventSetFlags(down, flags)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, down)
if duration > 0:
time.sleep(duration)
up = Quartz.CGEventCreateKeyboardEvent(None, keycode, False)
Quartz.CGEventSetFlags(up, 0)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, up)
def take_screenshot():
"""Take a screenshot via Quartz."""
import Quartz
import CoreFoundation
image = Quartz.CGDisplayCreateImage(Quartz.CGMainDisplayID())
if not image:
return None
fname = f"morrowind_{int(time.time())}.png"
path = os.path.join(SCREENSHOT_DIR, fname)
url = CoreFoundation.CFURLCreateWithFileSystemPath(None, path, 0, False)
dest = Quartz.CGImageDestinationCreateWithURL(url, "public.png", 1, None)
Quartz.CGImageDestinationAddImage(dest, image, None)
Quartz.CGImageDestinationFinalize(dest)
return path
# ═══════════════════════════════════════
# MCP SERVER
# ═══════════════════════════════════════
app = Server("morrowind")
@app.list_tools()
async def list_tools():
return [
Tool(
name="perceive",
description="Get Timmy's current perception of the game world: position, health, nearby NPCs, doors, items. Updates every 2 seconds from the Lua engine.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="status",
description="Check if Morrowind (OpenMW) is running.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="launch",
description="Launch OpenMW and bypass the main menu. By default loads the newest save with --skip-menu --load-savegame. If new_game=true, starts a new game instead.",
inputSchema={
"type": "object",
"properties": {
"save": {
"type": "string",
"description": "Optional save selector. Use 'latest' (default), an absolute path, or part of a save filename like 'Quicksave'.",
},
"new_game": {
"type": "boolean",
"description": "Start a new game instead of loading a save.",
"default": False,
},
"wait_for_pid": {
"type": "number",
"description": "Seconds to wait for the OpenMW process to appear.",
"default": 15.0,
},
},
"required": [],
},
),
Tool(
name="move",
description="Move the player character. Direction: forward, backward, left, right, turn_left, turn_right. Duration in seconds.",
inputSchema={
"type": "object",
"properties": {
"direction": {
"type": "string",
"enum": ["forward", "backward", "left", "right", "turn_left", "turn_right"],
"description": "Movement direction",
},
"duration": {
"type": "number",
"description": "How long to move in seconds (default: 1.0)",
"default": 1.0,
},
"run": {
"type": "boolean",
"description": "Hold shift to run (default: false)",
"default": False,
},
},
"required": ["direction"],
},
),
Tool(
name="action",
description="Perform a game action: activate (use/interact with what you're looking at), jump, attack, journal, quicksave, quickload, sneak, wait.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["activate", "jump", "attack", "journal", "quicksave", "quickload", "sneak", "wait"],
"description": "Action to perform",
},
},
"required": ["action"],
},
),
Tool(
name="screenshot",
description="Take a screenshot of the game. Returns the file path for vision analysis.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "perceive":
state = parse_latest_perception()
return [TextContent(type="text", text=json.dumps(state, indent=2))]
elif name == "status":
status = get_game_status()
return [TextContent(type="text", text=json.dumps(status, indent=2))]
elif name == "launch":
result = launch_openmw(
save=arguments.get("save"),
new_game=arguments.get("new_game", False),
wait_for_pid=arguments.get("wait_for_pid", 15.0),
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "move":
direction = arguments.get("direction", "forward")
duration = arguments.get("duration", 1.0)
run = arguments.get("run", False)
key_map = {
"forward": "w",
"backward": "s",
"left": "a",
"right": "d",
"turn_left": "left",
"turn_right": "right",
}
key = key_map.get(direction)
if not key:
return [TextContent(type="text", text=f"Unknown direction: {direction}")]
keycode = KEYCODES[key]
send_key(keycode, duration=duration, shift=run)
return [TextContent(type="text", text=f"Moved {direction} for {duration}s" + (" (running)" if run else ""))]
elif name == "action":
action = arguments.get("action")
action_map = {
"activate": ("space", 0.1),
"jump": ("space", 0.05), # tap for jump when moving
"attack": ("f", 0.3), # use key
"journal": ("j", 0.1),
"quicksave": ("f5", 0.1),
"quickload": ("f9", 0.1),
"sneak": ("q", 0.1), # toggle sneak/autowalk depending on config
"wait": ("t", 0.1),
}
if action not in action_map:
return [TextContent(type="text", text=f"Unknown action: {action}")]
key, dur = action_map[action]
send_key(KEYCODES[key], duration=dur)
return [TextContent(type="text", text=f"Performed: {action}")]
elif name == "screenshot":
path = take_screenshot()
if path:
return [TextContent(type="text", text=f"Screenshot saved: {path}")]
else:
return [TextContent(type="text", text="Screenshot failed — display not available")]
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())