Files
timmy-home/morrowind/mcp_server.py
Alexander Whitestone 0d64d8e559 initial: sovereign home — morrowind agent, skills, training-data, research, specs, notes, operational docs
Tracked: morrowind agent (py/cfg), skills/, training-data/, research/,
notes/, specs/, test-results/, metrics/, heartbeat/, briefings/,
memories/, skins/, hooks/, decisions.md, OPERATIONS.md, SOUL.md

Excluded: screenshots, PNGs, binaries, sessions, databases, secrets,
audio cache, timmy-config/ and timmy-telemetry/ (separate repos)
2026-03-27 13:05:57 -04:00

304 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Morrowind MCP Server — Timmy's game interface.
Exposes Morrowind as tools via MCP (stdio transport).
Perception comes from the OpenMW log file.
Actions go via CGEvent keypresses to the game window.
Register in ~/.hermes/config.yaml:
mcp_servers:
morrowind:
command: "python3"
args: ["/Users/apayne/.timmy/morrowind/mcp_server.py"]
"""
import json
import os
import re
import sys
import time
import subprocess
# MCP SDK
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# ═══════════════════════════════════════
# CONFIG
# ═══════════════════════════════════════
OPENMW_LOG = os.path.expanduser("~/Library/Preferences/openmw/openmw.log")
SCREENSHOT_DIR = os.path.expanduser("~/.timmy/morrowind/screenshots")
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# CGEvent key codes
KEYCODES = {
"w": 13, "a": 0, "s": 1, "d": 2,
"space": 49, "escape": 53, "return": 36,
"e": 14, "r": 15, "f": 3, "q": 12,
"tab": 48, "1": 18, "2": 19, "3": 20, "4": 21,
"5": 23, "6": 22, "7": 26, "8": 28, "9": 25,
"f5": 96, "f9": 101, # quicksave / quickload
"backtick": 50, # console
"j": 38, # journal
"up": 126, "down": 125, "left": 123, "right": 124,
}
# ═══════════════════════════════════════
# PERCEPTION — Parse OpenMW log
# ═══════════════════════════════════════
def parse_latest_perception():
"""Parse the most recent perception block from the OpenMW log."""
try:
with open(OPENMW_LOG, "r") as f:
content = f.read()
except FileNotFoundError:
return {"error": "OpenMW log not found. Is the game running?"}
# Find all perception blocks
blocks = re.findall(
r"=== TIMMY PERCEPTION ===\n(.*?)(?:=== END PERCEPTION ===)",
content, re.DOTALL
)
if not blocks:
return {"error": "No perception data in log. Game may not be running or Lua scripts not loaded."}
# Parse latest block
block = blocks[-1]
state = {
"npcs": [],
"doors": [],
"items": [],
}
for line in block.strip().split("\n"):
line = line.strip()
# Strip Lua log prefix if present
if "]:\t" in line:
line = line.split("]:\t", 1)[1]
if line.startswith("Cell:"):
state["cell"] = line.split(":", 1)[1].strip()
elif line.startswith("Pos:"):
state["position"] = line.split(":", 1)[1].strip()
elif line.startswith("Yaw:"):
state["yaw"] = line.split(":", 1)[1].strip()
elif line.startswith("HP:"):
state["health"] = line.split(":", 1)[1].strip()
elif line.startswith("MP:"):
state["magicka"] = line.split(":", 1)[1].strip()
elif line.startswith("FT:"):
state["fatigue"] = line.split(":", 1)[1].strip()
elif line.startswith("Mode:"):
state["mode"] = line.split(":", 1)[1].strip()
elif line.startswith("Time:"):
state["game_time"] = line.split(":", 1)[1].strip()
elif line.startswith("NPC:"):
state["npcs"].append(line[4:].strip())
elif line.startswith("Door:"):
state["doors"].append(line[5:].strip())
elif line.startswith("Item:"):
state["items"].append(line[5:].strip())
return state
def get_game_status():
"""Check if OpenMW is running."""
result = subprocess.run(["pgrep", "-f", "openmw"], capture_output=True, text=True)
running = result.returncode == 0
return {
"running": running,
"pid": result.stdout.strip().split("\n")[0] if running else None,
}
# ═══════════════════════════════════════
# ACTIONS — CGEvent keypresses
# ═══════════════════════════════════════
def send_key(keycode, duration=0.0, shift=False):
"""Send a keypress to the game via CGEvent."""
import Quartz
flags = Quartz.kCGEventFlagMaskShift if shift else 0
down = Quartz.CGEventCreateKeyboardEvent(None, keycode, True)
Quartz.CGEventSetFlags(down, flags)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, down)
if duration > 0:
time.sleep(duration)
up = Quartz.CGEventCreateKeyboardEvent(None, keycode, False)
Quartz.CGEventSetFlags(up, 0)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, up)
def take_screenshot():
"""Take a screenshot via Quartz."""
import Quartz
import CoreFoundation
image = Quartz.CGDisplayCreateImage(Quartz.CGMainDisplayID())
if not image:
return None
fname = f"morrowind_{int(time.time())}.png"
path = os.path.join(SCREENSHOT_DIR, fname)
url = CoreFoundation.CFURLCreateWithFileSystemPath(None, path, 0, False)
dest = Quartz.CGImageDestinationCreateWithURL(url, "public.png", 1, None)
Quartz.CGImageDestinationAddImage(dest, image, None)
Quartz.CGImageDestinationFinalize(dest)
return path
# ═══════════════════════════════════════
# MCP SERVER
# ═══════════════════════════════════════
app = Server("morrowind")
@app.list_tools()
async def list_tools():
return [
Tool(
name="perceive",
description="Get Timmy's current perception of the game world: position, health, nearby NPCs, doors, items. Updates every 2 seconds from the Lua engine.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="status",
description="Check if Morrowind (OpenMW) is running.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="move",
description="Move the player character. Direction: forward, backward, left, right, turn_left, turn_right. Duration in seconds.",
inputSchema={
"type": "object",
"properties": {
"direction": {
"type": "string",
"enum": ["forward", "backward", "left", "right", "turn_left", "turn_right"],
"description": "Movement direction",
},
"duration": {
"type": "number",
"description": "How long to move in seconds (default: 1.0)",
"default": 1.0,
},
"run": {
"type": "boolean",
"description": "Hold shift to run (default: false)",
"default": False,
},
},
"required": ["direction"],
},
),
Tool(
name="action",
description="Perform a game action: activate (use/interact with what you're looking at), jump, attack, journal, quicksave, quickload, sneak, wait.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["activate", "jump", "attack", "journal", "quicksave", "quickload", "sneak", "wait"],
"description": "Action to perform",
},
},
"required": ["action"],
},
),
Tool(
name="screenshot",
description="Take a screenshot of the game. Returns the file path for vision analysis.",
inputSchema={"type": "object", "properties": {}, "required": []},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "perceive":
state = parse_latest_perception()
return [TextContent(type="text", text=json.dumps(state, indent=2))]
elif name == "status":
status = get_game_status()
return [TextContent(type="text", text=json.dumps(status, indent=2))]
elif name == "move":
direction = arguments.get("direction", "forward")
duration = arguments.get("duration", 1.0)
run = arguments.get("run", False)
key_map = {
"forward": "w",
"backward": "s",
"left": "a",
"right": "d",
"turn_left": "left",
"turn_right": "right",
}
key = key_map.get(direction)
if not key:
return [TextContent(type="text", text=f"Unknown direction: {direction}")]
keycode = KEYCODES[key]
send_key(keycode, duration=duration, shift=run)
return [TextContent(type="text", text=f"Moved {direction} for {duration}s" + (" (running)" if run else ""))]
elif name == "action":
action = arguments.get("action")
action_map = {
"activate": ("space", 0.1),
"jump": ("space", 0.05), # tap for jump when moving
"attack": ("f", 0.3), # use key
"journal": ("j", 0.1),
"quicksave": ("f5", 0.1),
"quickload": ("f9", 0.1),
"sneak": ("q", 0.1), # toggle sneak/autowalk depending on config
"wait": ("t", 0.1),
}
if action not in action_map:
return [TextContent(type="text", text=f"Unknown action: {action}")]
key, dur = action_map[action]
send_key(KEYCODES[key], duration=dur)
return [TextContent(type="text", text=f"Performed: {action}")]
elif name == "screenshot":
path = take_screenshot()
if path:
return [TextContent(type="text", text=f"Screenshot saved: {path}")]
else:
return [TextContent(type="text", text="Screenshot failed — display not available")]
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())