Tracked: morrowind agent (py/cfg), skills/, training-data/, research/, notes/, specs/, test-results/, metrics/, heartbeat/, briefings/, memories/, skins/, hooks/, decisions.md, OPERATIONS.md, SOUL.md Excluded: screenshots, PNGs, binaries, sessions, databases, secrets, audio cache, timmy-config/ and timmy-telemetry/ (separate repos)
304 lines
10 KiB
Python
304 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Morrowind MCP Server — Timmy's game interface.
|
|
|
|
Exposes Morrowind as tools via MCP (stdio transport).
|
|
Perception comes from the OpenMW log file.
|
|
Actions go via CGEvent keypresses to the game window.
|
|
|
|
Register in ~/.hermes/config.yaml:
|
|
mcp_servers:
|
|
morrowind:
|
|
command: "python3"
|
|
args: ["/Users/apayne/.timmy/morrowind/mcp_server.py"]
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
|
|
# MCP SDK
|
|
from mcp.server import Server
|
|
from mcp.server.stdio import stdio_server
|
|
from mcp.types import Tool, TextContent
|
|
|
|
# ═══════════════════════════════════════
|
|
# CONFIG
|
|
# ═══════════════════════════════════════
|
|
|
|
OPENMW_LOG = os.path.expanduser("~/Library/Preferences/openmw/openmw.log")
|
|
SCREENSHOT_DIR = os.path.expanduser("~/.timmy/morrowind/screenshots")
|
|
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
|
|
|
|
# CGEvent key codes
|
|
KEYCODES = {
|
|
"w": 13, "a": 0, "s": 1, "d": 2,
|
|
"space": 49, "escape": 53, "return": 36,
|
|
"e": 14, "r": 15, "f": 3, "q": 12,
|
|
"tab": 48, "1": 18, "2": 19, "3": 20, "4": 21,
|
|
"5": 23, "6": 22, "7": 26, "8": 28, "9": 25,
|
|
"f5": 96, "f9": 101, # quicksave / quickload
|
|
"backtick": 50, # console
|
|
"j": 38, # journal
|
|
"up": 126, "down": 125, "left": 123, "right": 124,
|
|
}
|
|
|
|
# ═══════════════════════════════════════
|
|
# PERCEPTION — Parse OpenMW log
|
|
# ═══════════════════════════════════════
|
|
|
|
def parse_latest_perception():
|
|
"""Parse the most recent perception block from the OpenMW log."""
|
|
try:
|
|
with open(OPENMW_LOG, "r") as f:
|
|
content = f.read()
|
|
except FileNotFoundError:
|
|
return {"error": "OpenMW log not found. Is the game running?"}
|
|
|
|
# Find all perception blocks
|
|
blocks = re.findall(
|
|
r"=== TIMMY PERCEPTION ===\n(.*?)(?:=== END PERCEPTION ===)",
|
|
content, re.DOTALL
|
|
)
|
|
|
|
if not blocks:
|
|
return {"error": "No perception data in log. Game may not be running or Lua scripts not loaded."}
|
|
|
|
# Parse latest block
|
|
block = blocks[-1]
|
|
state = {
|
|
"npcs": [],
|
|
"doors": [],
|
|
"items": [],
|
|
}
|
|
|
|
for line in block.strip().split("\n"):
|
|
line = line.strip()
|
|
# Strip Lua log prefix if present
|
|
if "]:\t" in line:
|
|
line = line.split("]:\t", 1)[1]
|
|
|
|
if line.startswith("Cell:"):
|
|
state["cell"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Pos:"):
|
|
state["position"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Yaw:"):
|
|
state["yaw"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("HP:"):
|
|
state["health"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("MP:"):
|
|
state["magicka"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("FT:"):
|
|
state["fatigue"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Mode:"):
|
|
state["mode"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Time:"):
|
|
state["game_time"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("NPC:"):
|
|
state["npcs"].append(line[4:].strip())
|
|
elif line.startswith("Door:"):
|
|
state["doors"].append(line[5:].strip())
|
|
elif line.startswith("Item:"):
|
|
state["items"].append(line[5:].strip())
|
|
|
|
return state
|
|
|
|
|
|
def get_game_status():
|
|
"""Check if OpenMW is running."""
|
|
result = subprocess.run(["pgrep", "-f", "openmw"], capture_output=True, text=True)
|
|
running = result.returncode == 0
|
|
return {
|
|
"running": running,
|
|
"pid": result.stdout.strip().split("\n")[0] if running else None,
|
|
}
|
|
|
|
|
|
# ═══════════════════════════════════════
|
|
# ACTIONS — CGEvent keypresses
|
|
# ═══════════════════════════════════════
|
|
|
|
def send_key(keycode, duration=0.0, shift=False):
|
|
"""Send a keypress to the game via CGEvent."""
|
|
import Quartz
|
|
|
|
flags = Quartz.kCGEventFlagMaskShift if shift else 0
|
|
|
|
down = Quartz.CGEventCreateKeyboardEvent(None, keycode, True)
|
|
Quartz.CGEventSetFlags(down, flags)
|
|
Quartz.CGEventPost(Quartz.kCGHIDEventTap, down)
|
|
|
|
if duration > 0:
|
|
time.sleep(duration)
|
|
|
|
up = Quartz.CGEventCreateKeyboardEvent(None, keycode, False)
|
|
Quartz.CGEventSetFlags(up, 0)
|
|
Quartz.CGEventPost(Quartz.kCGHIDEventTap, up)
|
|
|
|
|
|
def take_screenshot():
|
|
"""Take a screenshot via Quartz."""
|
|
import Quartz
|
|
import CoreFoundation
|
|
|
|
image = Quartz.CGDisplayCreateImage(Quartz.CGMainDisplayID())
|
|
if not image:
|
|
return None
|
|
|
|
fname = f"morrowind_{int(time.time())}.png"
|
|
path = os.path.join(SCREENSHOT_DIR, fname)
|
|
|
|
url = CoreFoundation.CFURLCreateWithFileSystemPath(None, path, 0, False)
|
|
dest = Quartz.CGImageDestinationCreateWithURL(url, "public.png", 1, None)
|
|
Quartz.CGImageDestinationAddImage(dest, image, None)
|
|
Quartz.CGImageDestinationFinalize(dest)
|
|
|
|
return path
|
|
|
|
|
|
# ═══════════════════════════════════════
|
|
# MCP SERVER
|
|
# ═══════════════════════════════════════
|
|
|
|
app = Server("morrowind")
|
|
|
|
|
|
@app.list_tools()
|
|
async def list_tools():
|
|
return [
|
|
Tool(
|
|
name="perceive",
|
|
description="Get Timmy's current perception of the game world: position, health, nearby NPCs, doors, items. Updates every 2 seconds from the Lua engine.",
|
|
inputSchema={"type": "object", "properties": {}, "required": []},
|
|
),
|
|
Tool(
|
|
name="status",
|
|
description="Check if Morrowind (OpenMW) is running.",
|
|
inputSchema={"type": "object", "properties": {}, "required": []},
|
|
),
|
|
Tool(
|
|
name="move",
|
|
description="Move the player character. Direction: forward, backward, left, right, turn_left, turn_right. Duration in seconds.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"direction": {
|
|
"type": "string",
|
|
"enum": ["forward", "backward", "left", "right", "turn_left", "turn_right"],
|
|
"description": "Movement direction",
|
|
},
|
|
"duration": {
|
|
"type": "number",
|
|
"description": "How long to move in seconds (default: 1.0)",
|
|
"default": 1.0,
|
|
},
|
|
"run": {
|
|
"type": "boolean",
|
|
"description": "Hold shift to run (default: false)",
|
|
"default": False,
|
|
},
|
|
},
|
|
"required": ["direction"],
|
|
},
|
|
),
|
|
Tool(
|
|
name="action",
|
|
description="Perform a game action: activate (use/interact with what you're looking at), jump, attack, journal, quicksave, quickload, sneak, wait.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"action": {
|
|
"type": "string",
|
|
"enum": ["activate", "jump", "attack", "journal", "quicksave", "quickload", "sneak", "wait"],
|
|
"description": "Action to perform",
|
|
},
|
|
},
|
|
"required": ["action"],
|
|
},
|
|
),
|
|
Tool(
|
|
name="screenshot",
|
|
description="Take a screenshot of the game. Returns the file path for vision analysis.",
|
|
inputSchema={"type": "object", "properties": {}, "required": []},
|
|
),
|
|
]
|
|
|
|
|
|
@app.call_tool()
|
|
async def call_tool(name: str, arguments: dict):
|
|
if name == "perceive":
|
|
state = parse_latest_perception()
|
|
return [TextContent(type="text", text=json.dumps(state, indent=2))]
|
|
|
|
elif name == "status":
|
|
status = get_game_status()
|
|
return [TextContent(type="text", text=json.dumps(status, indent=2))]
|
|
|
|
elif name == "move":
|
|
direction = arguments.get("direction", "forward")
|
|
duration = arguments.get("duration", 1.0)
|
|
run = arguments.get("run", False)
|
|
|
|
key_map = {
|
|
"forward": "w",
|
|
"backward": "s",
|
|
"left": "a",
|
|
"right": "d",
|
|
"turn_left": "left",
|
|
"turn_right": "right",
|
|
}
|
|
|
|
key = key_map.get(direction)
|
|
if not key:
|
|
return [TextContent(type="text", text=f"Unknown direction: {direction}")]
|
|
|
|
keycode = KEYCODES[key]
|
|
send_key(keycode, duration=duration, shift=run)
|
|
return [TextContent(type="text", text=f"Moved {direction} for {duration}s" + (" (running)" if run else ""))]
|
|
|
|
elif name == "action":
|
|
action = arguments.get("action")
|
|
action_map = {
|
|
"activate": ("space", 0.1),
|
|
"jump": ("space", 0.05), # tap for jump when moving
|
|
"attack": ("f", 0.3), # use key
|
|
"journal": ("j", 0.1),
|
|
"quicksave": ("f5", 0.1),
|
|
"quickload": ("f9", 0.1),
|
|
"sneak": ("q", 0.1), # toggle sneak/autowalk depending on config
|
|
"wait": ("t", 0.1),
|
|
}
|
|
|
|
if action not in action_map:
|
|
return [TextContent(type="text", text=f"Unknown action: {action}")]
|
|
|
|
key, dur = action_map[action]
|
|
send_key(KEYCODES[key], duration=dur)
|
|
return [TextContent(type="text", text=f"Performed: {action}")]
|
|
|
|
elif name == "screenshot":
|
|
path = take_screenshot()
|
|
if path:
|
|
return [TextContent(type="text", text=f"Screenshot saved: {path}")]
|
|
else:
|
|
return [TextContent(type="text", text="Screenshot failed — display not available")]
|
|
|
|
return [TextContent(type="text", text=f"Unknown tool: {name}")]
|
|
|
|
|
|
async def main():
|
|
async with stdio_server() as (read_stream, write_stream):
|
|
await app.run(
|
|
read_stream,
|
|
write_stream,
|
|
app.create_initialization_options(),
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import asyncio
|
|
asyncio.run(main())
|