Files
timmy-home/morrowind/mcp_server.py
Alexander Whitestone 9c1dd7fff7 chore: check in all local work — uniwizard, briefings, reports, evennia, morrowind, scripts, specs, training data, angband MCP, diagrams, twitter archive, wizards
- Resolve decisions.md merge conflict (keep both Codex boundary + Ezra/Bezalel entries)
- Update .gitignore: protect bare secret files, exclude venvs and nexus-localhost
- Add uniwizard tools (mention watcher, adaptive prompt router, self-grader, classifiers)
- Add briefings, good-morning reports, production reports
- Add evennia world scaffold and training data
- Add angband and morrowind MCP servers
- Add diagrams, specs, test results, overnight loop scripts
- Add twitter archive insights and media metadata
- Add wizard workspaces (allegro, nahshon)
2026-03-30 17:18:09 -04:00

452 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Morrowind MCP Server — Timmy's game interface.
Exposes Morrowind as tools via MCP (stdio transport).
Perception comes from the OpenMW log file.
Actions go via CGEvent keypresses to the game window.
Register in ~/.hermes/config.yaml:
mcp_servers:
morrowind:
command: "python3"
args: ["/Users/apayne/.timmy/morrowind/mcp_server.py"]
"""
import json
import os
import re
import sys
import time
import subprocess
# MCP SDK
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# ═══════════════════════════════════════
# CONFIG
# ═══════════════════════════════════════
OPENMW_LOG = os.path.expanduser("~/Library/Preferences/openmw/openmw.log")
SCREENSHOT_DIR = os.path.expanduser("~/.timmy/morrowind/screenshots")
OPENMW_BIN = "/Applications/OpenMW.app/Contents/MacOS/openmw"
OPENMW_SAVES_DIR = os.path.expanduser("~/Library/Application Support/openmw/saves")
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# CGEvent key codes
KEYCODES = {
"w": 13, "a": 0, "s": 1, "d": 2,
"space": 49, "escape": 53, "return": 36,
"e": 14, "r": 15, "t": 17, "f": 3, "q": 12,
"tab": 48, "1": 18, "2": 19, "3": 20, "4": 21,
"5": 23, "6": 22, "7": 26, "8": 28, "9": 25,
"f5": 96, "f9": 101, # quicksave / quickload
"backtick": 50, # console
"j": 38, # journal
"up": 126, "down": 125, "left": 123, "right": 124,
}
# ═══════════════════════════════════════
# PERCEPTION — Parse OpenMW log
# ═══════════════════════════════════════
def parse_latest_perception():
"""Parse the most recent perception block from the OpenMW log."""
try:
with open(OPENMW_LOG, "r") as f:
content = f.read()
except FileNotFoundError:
return {"error": "OpenMW log not found. Is the game running?"}
# Find all perception blocks
blocks = re.findall(
r"=== TIMMY PERCEPTION ===\n(.*?)(?:=== END PERCEPTION ===)",
content, re.DOTALL
)
if not blocks:
return {"error": "No perception data in log. Game may not be running or Lua scripts not loaded."}
# Parse latest block
block = blocks[-1]
state = {
"npcs": [],
"doors": [],
"items": [],
}
for line in block.strip().split("\n"):
line = line.strip()
# Strip Lua log prefix if present
if "]:\t" in line:
line = line.split("]:\t", 1)[1]
if line.startswith("Cell:"):
state["cell"] = line.split(":", 1)[1].strip()
elif line.startswith("Pos:"):
state["position"] = line.split(":", 1)[1].strip()
elif line.startswith("Yaw:"):
state["yaw"] = line.split(":", 1)[1].strip()
elif line.startswith("HP:"):
state["health"] = line.split(":", 1)[1].strip()
elif line.startswith("MP:"):
state["magicka"] = line.split(":", 1)[1].strip()
elif line.startswith("FT:"):
state["fatigue"] = line.split(":", 1)[1].strip()
elif line.startswith("Mode:"):
state["mode"] = line.split(":", 1)[1].strip()
elif line.startswith("Time:"):
state["game_time"] = line.split(":", 1)[1].strip()
elif line.startswith("NPC:"):
state["npcs"].append(line[4:].strip())
elif line.startswith("Door:"):
state["doors"].append(line[5:].strip())
elif line.startswith("Item:"):
state["items"].append(line[5:].strip())
return state
def get_game_status():
"""Check if OpenMW is running."""
result = subprocess.run(["pgrep", "-f", "openmw"], capture_output=True, text=True)
running = result.returncode == 0
return {
"running": running,
"pid": result.stdout.strip().split("\n")[0] if running else None,
}
def focus_openmw():
"""Bring OpenMW to the front if it's running."""
subprocess.run(
[
"osascript",
"-e",
'tell application "System Events" to set frontmost of process "openmw" to true',
],
capture_output=True,
text=True,
)
time.sleep(0.5)
def find_savegame(selector=None):
"""Find a savegame path.
selector:
- None / "latest" => newest .omwsave under the OpenMW saves tree
- absolute path => use directly if it exists
- other string => case-insensitive basename substring match, newest first
"""
if selector and os.path.isabs(selector) and os.path.exists(selector):
return selector
candidates = []
for root, _dirs, files in os.walk(OPENMW_SAVES_DIR):
for fname in files:
if fname.endswith(".omwsave"):
path = os.path.join(root, fname)
try:
mtime = os.path.getmtime(path)
except OSError:
continue
candidates.append((mtime, path))
if not candidates:
return None
candidates.sort(reverse=True)
if not selector or str(selector).strip().lower() == "latest":
return candidates[0][1]
selector_lower = str(selector).strip().lower()
for _mtime, path in candidates:
if selector_lower in os.path.basename(path).lower():
return path
return None
def launch_openmw(save=None, new_game=False, wait_for_pid=15.0):
"""Launch OpenMW directly into a save or new game sequence."""
status = get_game_status()
if status["running"]:
focus_openmw()
return {
"launched": False,
"already_running": True,
"pid": status["pid"],
"save": None,
"new_game": False,
"message": "OpenMW is already running; focused existing game window.",
}
if not os.path.exists(OPENMW_BIN):
return {
"error": f"OpenMW binary not found: {OPENMW_BIN}"
}
cmd = [OPENMW_BIN]
save_path = None
if new_game:
cmd.extend(["--skip-menu", "--new-game"])
else:
save_path = find_savegame(save)
if not save_path:
return {
"error": "No savegame found. Provide a save selector/path or create a save first."
}
cmd.extend(["--skip-menu", "--load-savegame", save_path])
proc = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
deadline = time.time() + max(1.0, float(wait_for_pid))
pid = None
while time.time() < deadline:
status = get_game_status()
if status["running"]:
pid = status["pid"]
break
time.sleep(0.5)
if pid:
focus_openmw()
return {
"launched": True,
"already_running": False,
"pid": pid or str(proc.pid),
"save": save_path,
"new_game": bool(new_game),
"message": "OpenMW launched with --skip-menu. If loading a save, wait a few seconds before moving.",
}
# ═══════════════════════════════════════
# ACTIONS — CGEvent keypresses
# ═══════════════════════════════════════
def send_key(keycode, duration=0.0, shift=False):
"""Send a keypress to the game via CGEvent."""
import Quartz
flags = Quartz.kCGEventFlagMaskShift if shift else 0
down = Quartz.CGEventCreateKeyboardEvent(None, keycode, True)
Quartz.CGEventSetFlags(down, flags)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, down)
if duration > 0:
time.sleep(duration)
up = Quartz.CGEventCreateKeyboardEvent(None, keycode, False)
Quartz.CGEventSetFlags(up, 0)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, up)
def take_screenshot():
"""Take a screenshot via Quartz."""
import Quartz
import CoreFoundation
image = Quartz.CGDisplayCreateImage(Quartz.CGMainDisplayID())
if not image:
return None
fname = f"morrowind_{int(time.time())}.png"
path = os.path.join(SCREENSHOT_DIR, fname)
url = CoreFoundation.CFURLCreateWithFileSystemPath(None, path, 0, False)
dest = Quartz.CGImageDestinationCreateWithURL(url, "public.png", 1, None)
Quartz.CGImageDestinationAddImage(dest, image, None)
Quartz.CGImageDestinationFinalize(dest)
return path
# ═══════════════════════════════════════
# MCP SERVER
# ═══════════════════════════════════════
app = Server("morrowind")
@app.list_tools()
async def list_tools():
return [
Tool(
name="perceive",
description="Get Timmy's current perception of the game world: position, health, nearby NPCs, doors, items. Updates every 2 seconds from the Lua engine.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="status",
description="Check if Morrowind (OpenMW) is running.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="launch",
description="Launch OpenMW and bypass the main menu. By default loads the newest save with --skip-menu --load-savegame. If new_game=true, starts a new game instead.",
inputSchema={
"type": "object",
"properties": {
"save": {
"type": "string",
"description": "Optional save selector. Use 'latest' (default), an absolute path, or part of a save filename like 'Quicksave'.",
},
"new_game": {
"type": "boolean",
"description": "Start a new game instead of loading a save.",
"default": False,
},
"wait_for_pid": {
"type": "number",
"description": "Seconds to wait for the OpenMW process to appear.",
"default": 15.0,
},
},
"required": [],
},
),
Tool(
name="move",
description="Move the player character. Direction: forward, backward, left, right, turn_left, turn_right. Duration in seconds.",
inputSchema={
"type": "object",
"properties": {
"direction": {
"type": "string",
"enum": ["forward", "backward", "left", "right", "turn_left", "turn_right"],
"description": "Movement direction",
},
"duration": {
"type": "number",
"description": "How long to move in seconds (default: 1.0)",
"default": 1.0,
},
"run": {
"type": "boolean",
"description": "Hold shift to run (default: false)",
"default": False,
},
},
"required": ["direction"],
},
),
Tool(
name="action",
description="Perform a game action: activate (use/interact with what you're looking at), jump, attack, journal, quicksave, quickload, sneak, wait.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["activate", "jump", "attack", "journal", "quicksave", "quickload", "sneak", "wait"],
"description": "Action to perform",
},
},
"required": ["action"],
},
),
Tool(
name="screenshot",
description="Take a screenshot of the game. Returns the file path for vision analysis.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "perceive":
state = parse_latest_perception()
return [TextContent(type="text", text=json.dumps(state, indent=2))]
elif name == "status":
status = get_game_status()
return [TextContent(type="text", text=json.dumps(status, indent=2))]
elif name == "launch":
result = launch_openmw(
save=arguments.get("save"),
new_game=arguments.get("new_game", False),
wait_for_pid=arguments.get("wait_for_pid", 15.0),
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "move":
direction = arguments.get("direction", "forward")
duration = arguments.get("duration", 1.0)
run = arguments.get("run", False)
key_map = {
"forward": "w",
"backward": "s",
"left": "a",
"right": "d",
"turn_left": "left",
"turn_right": "right",
}
key = key_map.get(direction)
if not key:
return [TextContent(type="text", text=f"Unknown direction: {direction}")]
keycode = KEYCODES[key]
send_key(keycode, duration=duration, shift=run)
return [TextContent(type="text", text=f"Moved {direction} for {duration}s" + (" (running)" if run else ""))]
elif name == "action":
action = arguments.get("action")
action_map = {
"activate": ("space", 0.1),
"jump": ("space", 0.05), # tap for jump when moving
"attack": ("f", 0.3), # use key
"journal": ("j", 0.1),
"quicksave": ("f5", 0.1),
"quickload": ("f9", 0.1),
"sneak": ("q", 0.1), # toggle sneak/autowalk depending on config
"wait": ("t", 0.1),
}
if action not in action_map:
return [TextContent(type="text", text=f"Unknown action: {action}")]
key, dur = action_map[action]
send_key(KEYCODES[key], duration=dur)
return [TextContent(type="text", text=f"Performed: {action}")]
elif name == "screenshot":
path = take_screenshot()
if path:
return [TextContent(type="text", text=f"Screenshot saved: {path}")]
else:
return [TextContent(type="text", text="Screenshot failed — display not available")]
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())