#!/usr/bin/env python3 """ Bannerlord MCP Harness — GamePortal Protocol Implementation A harness for Mount & Blade II: Bannerlord using MCP (Model Context Protocol) servers: - desktop-control MCP: screenshots, mouse/keyboard input - steam-info MCP: game stats, achievements, player count This harness implements the GamePortal Protocol: capture_state() → GameState execute_action(action) → ActionResult The ODA (Observe-Decide-Act) loop connects perception to action through Hermes WebSocket telemetry. """ from __future__ import annotations import asyncio import json import logging import subprocess import time import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Optional import websockets # ═══════════════════════════════════════════════════════════════════════════ # CONFIGURATION # ═══════════════════════════════════════════════════════════════════════════ BANNERLORD_APP_ID = 261550 BANNERLORD_WINDOW_TITLE = "Mount & Blade II: Bannerlord" DEFAULT_HERMES_WS_URL = "ws://localhost:8000/ws" DEFAULT_MCP_DESKTOP_COMMAND = ["npx", "-y", "@modelcontextprotocol/server-desktop-control"] DEFAULT_MCP_STEAM_COMMAND = ["npx", "-y", "@modelcontextprotocol/server-steam-info"] logging.basicConfig( level=logging.INFO, format="%(asctime)s [bannerlord] %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("bannerlord") # ═══════════════════════════════════════════════════════════════════════════ # MCP CLIENT — JSON-RPC over stdio # ═══════════════════════════════════════════════════════════════════════════ class MCPClient: """Client for MCP servers communicating over stdio.""" def __init__(self, name: str, command: list[str]): self.name = name self.command = command self.process: Optional[subprocess.Popen] = None self.request_id = 0 self._lock = asyncio.Lock() async def start(self) -> bool: """Start the MCP server process.""" try: self.process = subprocess.Popen( self.command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) # Give it a moment to initialize await asyncio.sleep(0.5) if self.process.poll() is not None: log.error(f"MCP server {self.name} exited immediately") return False log.info(f"MCP server {self.name} started (PID: {self.process.pid})") return True except Exception as e: log.error(f"Failed to start MCP server {self.name}: {e}") return False def stop(self): """Stop the MCP server process.""" if self.process and self.process.poll() is None: self.process.terminate() try: self.process.wait(timeout=2) except subprocess.TimeoutExpired: self.process.kill() log.info(f"MCP server {self.name} stopped") async def call_tool(self, tool_name: str, arguments: dict) -> dict: """Call an MCP tool and return the result.""" async with self._lock: self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": "tools/call", "params": { "name": tool_name, "arguments": arguments, }, } if not self.process or self.process.poll() is not None: return {"error": "MCP server not running"} try: # Send request request_line = json.dumps(request) + "\n" self.process.stdin.write(request_line) self.process.stdin.flush() # Read response (with timeout) response_line = await asyncio.wait_for( asyncio.to_thread(self.process.stdout.readline), timeout=10.0, ) if not response_line: return {"error": "Empty response from MCP server"} response = json.loads(response_line) return response.get("result", {}).get("content", [{}])[0].get("text", "") except asyncio.TimeoutError: return {"error": f"Timeout calling {tool_name}"} except json.JSONDecodeError as e: return {"error": f"Invalid JSON response: {e}"} except Exception as e: return {"error": str(e)} async def list_tools(self) -> list[str]: """List available tools from the MCP server.""" async with self._lock: self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": "tools/list", } try: request_line = json.dumps(request) + "\n" self.process.stdin.write(request_line) self.process.stdin.flush() response_line = await asyncio.wait_for( asyncio.to_thread(self.process.stdout.readline), timeout=5.0, ) response = json.loads(response_line) tools = response.get("result", {}).get("tools", []) return [t.get("name", "unknown") for t in tools] except Exception as e: log.warning(f"Failed to list tools: {e}") return [] # ═══════════════════════════════════════════════════════════════════════════ # GAME STATE DATA CLASSES # ═══════════════════════════════════════════════════════════════════════════ @dataclass class VisualState: """Visual perception from the game.""" screenshot_path: Optional[str] = None screen_size: tuple[int, int] = (1920, 1080) mouse_position: tuple[int, int] = (0, 0) window_found: bool = False window_title: str = "" @dataclass class GameContext: """Game-specific context from Steam.""" app_id: int = BANNERLORD_APP_ID playtime_hours: float = 0.0 achievements_unlocked: int = 0 achievements_total: int = 0 current_players_online: int = 0 game_name: str = "Mount & Blade II: Bannerlord" is_running: bool = False @dataclass class GameState: """Complete game state per GamePortal Protocol.""" portal_id: str = "bannerlord" timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) visual: VisualState = field(default_factory=VisualState) game_context: GameContext = field(default_factory=GameContext) session_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) def to_dict(self) -> dict: return { "portal_id": self.portal_id, "timestamp": self.timestamp, "session_id": self.session_id, "visual": { "screenshot_path": self.visual.screenshot_path, "screen_size": list(self.visual.screen_size), "mouse_position": list(self.visual.mouse_position), "window_found": self.visual.window_found, "window_title": self.visual.window_title, }, "game_context": { "app_id": self.game_context.app_id, "playtime_hours": self.game_context.playtime_hours, "achievements_unlocked": self.game_context.achievements_unlocked, "achievements_total": self.game_context.achievements_total, "current_players_online": self.game_context.current_players_online, "game_name": self.game_context.game_name, "is_running": self.game_context.is_running, }, } @dataclass class ActionResult: """Result of executing an action.""" success: bool = False action: str = "" params: dict = field(default_factory=dict) timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) error: Optional[str] = None def to_dict(self) -> dict: result = { "success": self.success, "action": self.action, "params": self.params, "timestamp": self.timestamp, } if self.error: result["error"] = self.error return result # ═══════════════════════════════════════════════════════════════════════════ # BANNERLORD HARNESS — Main Implementation # ═══════════════════════════════════════════════════════════════════════════ class BannerlordHarness: """ Harness for Mount & Blade II: Bannerlord. Implements the GamePortal Protocol: - capture_state(): Takes screenshot, gets screen info, fetches Steam stats - execute_action(): Translates actions to MCP tool calls Telemetry flows through Hermes WebSocket for the ODA loop. """ def __init__( self, hermes_ws_url: str = DEFAULT_HERMES_WS_URL, desktop_command: Optional[list[str]] = None, steam_command: Optional[list[str]] = None, enable_mock: bool = False, ): self.hermes_ws_url = hermes_ws_url self.desktop_command = desktop_command or DEFAULT_MCP_DESKTOP_COMMAND self.steam_command = steam_command or DEFAULT_MCP_STEAM_COMMAND self.enable_mock = enable_mock # MCP clients self.desktop_mcp: Optional[MCPClient] = None self.steam_mcp: Optional[MCPClient] = None # WebSocket connection to Hermes self.ws: Optional[websockets.WebSocketClientProtocol] = None self.ws_connected = False # State self.session_id = str(uuid.uuid4())[:8] self.cycle_count = 0 self.running = False # ═══ LIFECYCLE ═══ async def start(self) -> bool: """Initialize MCP servers and WebSocket connection.""" log.info("=" * 50) log.info("BANNERLORD HARNESS — INITIALIZING") log.info(f" Session: {self.session_id}") log.info(f" Hermes WS: {self.hermes_ws_url}") log.info("=" * 50) # Start MCP servers (or use mock mode) if not self.enable_mock: self.desktop_mcp = MCPClient("desktop-control", self.desktop_command) self.steam_mcp = MCPClient("steam-info", self.steam_command) desktop_ok = await self.desktop_mcp.start() steam_ok = await self.steam_mcp.start() if not desktop_ok: log.warning("Desktop MCP failed to start, enabling mock mode") self.enable_mock = True if not steam_ok: log.warning("Steam MCP failed to start, will use fallback stats") else: log.info("Running in MOCK mode — no actual MCP servers") # Connect to Hermes WebSocket await self._connect_hermes() log.info("Harness initialized successfully") return True async def stop(self): """Shutdown MCP servers and disconnect.""" self.running = False log.info("Shutting down harness...") if self.desktop_mcp: self.desktop_mcp.stop() if self.steam_mcp: self.steam_mcp.stop() if self.ws: await self.ws.close() self.ws_connected = False log.info("Harness shutdown complete") async def _connect_hermes(self): """Connect to Hermes WebSocket for telemetry.""" try: self.ws = await websockets.connect(self.hermes_ws_url) self.ws_connected = True log.info(f"Connected to Hermes: {self.hermes_ws_url}") # Register as a harness await self._send_telemetry({ "type": "harness_register", "harness_id": "bannerlord", "session_id": self.session_id, "game": "Mount & Blade II: Bannerlord", "app_id": BANNERLORD_APP_ID, }) except Exception as e: log.warning(f"Could not connect to Hermes: {e}") self.ws_connected = False async def _send_telemetry(self, data: dict): """Send telemetry data to Hermes WebSocket.""" if self.ws_connected and self.ws: try: await self.ws.send(json.dumps(data)) except Exception as e: log.warning(f"Telemetry send failed: {e}") self.ws_connected = False # ═══ GAMEPORTAL PROTOCOL: capture_state() ═══ async def capture_state(self) -> GameState: """ Capture current game state. Returns GameState with: - Screenshot of Bannerlord window - Screen dimensions and mouse position - Steam stats (playtime, achievements, player count) """ state = GameState(session_id=self.session_id) # Capture visual state via desktop-control MCP visual = await self._capture_visual_state() state.visual = visual # Capture game context via steam-info MCP context = await self._capture_game_context() state.game_context = context # Send telemetry await self._send_telemetry({ "type": "game_state_captured", "portal_id": "bannerlord", "session_id": self.session_id, "cycle": self.cycle_count, "visual": { "window_found": visual.window_found, "screen_size": list(visual.screen_size), }, "game_context": { "is_running": context.is_running, "playtime_hours": context.playtime_hours, }, }) return state async def _capture_visual_state(self) -> VisualState: """Capture visual state via desktop-control MCP.""" visual = VisualState() if self.enable_mock or not self.desktop_mcp: # Mock mode: simulate a screenshot visual.screenshot_path = f"/tmp/bannerlord_mock_{int(time.time())}.png" visual.screen_size = (1920, 1080) visual.mouse_position = (960, 540) visual.window_found = True visual.window_title = BANNERLORD_WINDOW_TITLE return visual try: # Get screen size size_result = await self.desktop_mcp.call_tool("get_screen_size", {}) if isinstance(size_result, str): # Parse "1920x1080" or similar parts = size_result.lower().replace("x", " ").split() if len(parts) >= 2: visual.screen_size = (int(parts[0]), int(parts[1])) # Get mouse position mouse_result = await self.desktop_mcp.call_tool("get_mouse_position", {}) if isinstance(mouse_result, str): # Parse "100, 200" or similar parts = mouse_result.replace(",", " ").split() if len(parts) >= 2: visual.mouse_position = (int(parts[0]), int(parts[1])) # Take screenshot screenshot_path = f"/tmp/bannerlord_capture_{int(time.time())}.png" screenshot_result = await self.desktop_mcp.call_tool( "take_screenshot", {"path": screenshot_path, "window_title": BANNERLORD_WINDOW_TITLE} ) if screenshot_result and "error" not in str(screenshot_result): visual.screenshot_path = screenshot_path visual.window_found = True visual.window_title = BANNERLORD_WINDOW_TITLE else: # Try generic screenshot screenshot_result = await self.desktop_mcp.call_tool( "take_screenshot", {"path": screenshot_path} ) if screenshot_result and "error" not in str(screenshot_result): visual.screenshot_path = screenshot_path visual.window_found = True except Exception as e: log.warning(f"Visual capture failed: {e}") visual.window_found = False return visual async def _capture_game_context(self) -> GameContext: """Capture game context via steam-info MCP.""" context = GameContext() if self.enable_mock or not self.steam_mcp: # Mock mode: return simulated stats context.playtime_hours = 142.5 context.achievements_unlocked = 23 context.achievements_total = 96 context.current_players_online = 8421 context.is_running = True return context try: # Get current player count players_result = await self.steam_mcp.call_tool( "steam-current-players", {"app_id": BANNERLORD_APP_ID} ) if isinstance(players_result, (int, float)): context.current_players_online = int(players_result) elif isinstance(players_result, str): # Try to extract number digits = "".join(c for c in players_result if c.isdigit()) if digits: context.current_players_online = int(digits) # Get user stats (requires Steam user ID) # For now, use placeholder stats context.playtime_hours = 0.0 context.achievements_unlocked = 0 context.achievements_total = 0 except Exception as e: log.warning(f"Game context capture failed: {e}") return context # ═══ GAMEPORTAL PROTOCOL: execute_action() ═══ async def execute_action(self, action: dict) -> ActionResult: """ Execute an action in the game. Supported actions: - click: { "type": "click", "x": int, "y": int } - right_click: { "type": "right_click", "x": int, "y": int } - double_click: { "type": "double_click", "x": int, "y": int } - move_to: { "type": "move_to", "x": int, "y": int } - drag_to: { "type": "drag_to", "x": int, "y": int, "duration": float } - press_key: { "type": "press_key", "key": str } - hotkey: { "type": "hotkey", "keys": str } # e.g., "ctrl shift s" - type_text: { "type": "type_text", "text": str } - scroll: { "type": "scroll", "amount": int } Bannerlord-specific shortcuts: - inventory: hotkey("i") - character: hotkey("c") - party: hotkey("p") - save: hotkey("ctrl s") - load: hotkey("ctrl l") """ action_type = action.get("type", "") result = ActionResult(action=action_type, params=action) if self.enable_mock or not self.desktop_mcp: # Mock mode: log the action but don't execute log.info(f"[MOCK] Action: {action_type} with params: {action}") result.success = True await self._send_telemetry({ "type": "action_executed", "action": action_type, "params": action, "success": True, "mock": True, }) return result try: success = False if action_type == "click": success = await self._mcp_click(action.get("x", 0), action.get("y", 0)) elif action_type == "right_click": success = await self._mcp_right_click(action.get("x", 0), action.get("y", 0)) elif action_type == "double_click": success = await self._mcp_double_click(action.get("x", 0), action.get("y", 0)) elif action_type == "move_to": success = await self._mcp_move_to(action.get("x", 0), action.get("y", 0)) elif action_type == "drag_to": success = await self._mcp_drag_to( action.get("x", 0), action.get("y", 0), action.get("duration", 0.5) ) elif action_type == "press_key": success = await self._mcp_press_key(action.get("key", "")) elif action_type == "hotkey": success = await self._mcp_hotkey(action.get("keys", "")) elif action_type == "type_text": success = await self._mcp_type_text(action.get("text", "")) elif action_type == "scroll": success = await self._mcp_scroll(action.get("amount", 0)) else: result.error = f"Unknown action type: {action_type}" result.success = success if not success and not result.error: result.error = "MCP tool call failed" except Exception as e: result.success = False result.error = str(e) log.error(f"Action execution failed: {e}") # Send telemetry await self._send_telemetry({ "type": "action_executed", "action": action_type, "params": action, "success": result.success, "error": result.error, }) return result # ═══ MCP TOOL WRAPPERS ═══ async def _mcp_click(self, x: int, y: int) -> bool: """Execute click via desktop-control MCP.""" result = await self.desktop_mcp.call_tool("click", {"x": x, "y": y}) return "error" not in str(result).lower() async def _mcp_right_click(self, x: int, y: int) -> bool: """Execute right-click via desktop-control MCP.""" result = await self.desktop_mcp.call_tool("right_click", {"x": x, "y": y}) return "error" not in str(result).lower() async def _mcp_double_click(self, x: int, y: int) -> bool: """Execute double-click via desktop-control MCP.""" result = await self.desktop_mcp.call_tool("double_click", {"x": x, "y": y}) return "error" not in str(result).lower() async def _mcp_move_to(self, x: int, y: int) -> bool: """Move mouse via desktop-control MCP.""" result = await self.desktop_mcp.call_tool("move_to", {"x": x, "y": y}) return "error" not in str(result).lower() async def _mcp_drag_to(self, x: int, y: int, duration: float = 0.5) -> bool: """Drag mouse via desktop-control MCP.""" result = await self.desktop_mcp.call_tool( "drag_to", {"x": x, "y": y, "duration": duration} ) return "error" not in str(result).lower() async def _mcp_press_key(self, key: str) -> bool: """Press key via desktop-control MCP.""" result = await self.desktop_mcp.call_tool("press_key", {"key": key}) return "error" not in str(result).lower() async def _mcp_hotkey(self, keys: str) -> bool: """Execute hotkey combo via desktop-control MCP.""" result = await self.desktop_mcp.call_tool("hotkey", {"keys": keys}) return "error" not in str(result).lower() async def _mcp_type_text(self, text: str) -> bool: """Type text via desktop-control MCP.""" result = await self.desktop_mcp.call_tool("type_text", {"text": text}) return "error" not in str(result).lower() async def _mcp_scroll(self, amount: int) -> bool: """Scroll via desktop-control MCP.""" result = await self.desktop_mcp.call_tool("scroll", {"amount": amount}) return "error" not in str(result).lower() # ═══ BANNERLORD-SPECIFIC ACTIONS ═══ async def open_inventory(self) -> ActionResult: """Open inventory screen (I key).""" return await self.execute_action({"type": "press_key", "key": "i"}) async def open_character(self) -> ActionResult: """Open character screen (C key).""" return await self.execute_action({"type": "press_key", "key": "c"}) async def open_party(self) -> ActionResult: """Open party screen (P key).""" return await self.execute_action({"type": "press_key", "key": "p"}) async def save_game(self) -> ActionResult: """Save game (Ctrl+S).""" return await self.execute_action({"type": "hotkey", "keys": "ctrl s"}) async def load_game(self) -> ActionResult: """Load game (Ctrl+L).""" return await self.execute_action({"type": "hotkey", "keys": "ctrl l"}) async def click_settlement(self, x: int, y: int) -> ActionResult: """Click on a settlement on the campaign map.""" return await self.execute_action({"type": "click", "x": x, "y": y}) async def move_army(self, x: int, y: int) -> ActionResult: """Right-click to move army on campaign map.""" return await self.execute_action({"type": "right_click", "x": x, "y": y}) async def select_unit(self, x: int, y: int) -> ActionResult: """Click to select a unit in battle.""" return await self.execute_action({"type": "click", "x": x, "y": y}) async def command_unit(self, x: int, y: int) -> ActionResult: """Right-click to command a unit in battle.""" return await self.execute_action({"type": "right_click", "x": x, "y": y}) # ═══ ODA LOOP (Observe-Decide-Act) ═══ async def run_observe_decide_act_loop( self, decision_fn: Callable[[GameState], list[dict]], max_iterations: int = 10, iteration_delay: float = 2.0, ): """ The core ODA loop — proves the harness works. 1. OBSERVE: Capture game state (screenshot, stats) 2. DECIDE: Call decision_fn(state) to get actions 3. ACT: Execute each action 4. REPEAT Args: decision_fn: Function that takes GameState and returns list of actions max_iterations: Maximum number of ODA cycles iteration_delay: Seconds to wait between cycles """ log.info("=" * 50) log.info("STARTING ODA LOOP") log.info(f" Max iterations: {max_iterations}") log.info(f" Iteration delay: {iteration_delay}s") log.info("=" * 50) self.running = True for iteration in range(max_iterations): if not self.running: break self.cycle_count = iteration log.info(f"\n--- ODA Cycle {iteration + 1}/{max_iterations} ---") # 1. OBSERVE: Capture state log.info("[OBSERVE] Capturing game state...") state = await self.capture_state() log.info(f" Screenshot: {state.visual.screenshot_path}") log.info(f" Window found: {state.visual.window_found}") log.info(f" Screen: {state.visual.screen_size}") log.info(f" Players online: {state.game_context.current_players_online}") # 2. DECIDE: Get actions from decision function log.info("[DECIDE] Getting actions...") actions = decision_fn(state) log.info(f" Decision returned {len(actions)} actions") # 3. ACT: Execute actions log.info("[ACT] Executing actions...") results = [] for i, action in enumerate(actions): log.info(f" Action {i+1}/{len(actions)}: {action.get('type', 'unknown')}") result = await self.execute_action(action) results.append(result) log.info(f" Result: {'SUCCESS' if result.success else 'FAILED'}") if result.error: log.info(f" Error: {result.error}") # Send cycle summary telemetry await self._send_telemetry({ "type": "oda_cycle_complete", "cycle": iteration, "actions_executed": len(actions), "successful": sum(1 for r in results if r.success), "failed": sum(1 for r in results if not r.success), }) # Delay before next iteration if iteration < max_iterations - 1: await asyncio.sleep(iteration_delay) log.info("\n" + "=" * 50) log.info("ODA LOOP COMPLETE") log.info(f"Total cycles: {self.cycle_count + 1}") log.info("=" * 50) # ═══════════════════════════════════════════════════════════════════════════ # SIMPLE DECISION FUNCTIONS FOR TESTING # ═══════════════════════════════════════════════════════════════════════════ def simple_test_decision(state: GameState) -> list[dict]: """ A simple decision function for testing. In a real implementation, this would: 1. Analyze the screenshot (vision model) 2. Consider game context 3. Return appropriate actions """ actions = [] # Example: If on campaign map, move mouse to center if state.visual.window_found: center_x = state.visual.screen_size[0] // 2 center_y = state.visual.screen_size[1] // 2 actions.append({"type": "move_to", "x": center_x, "y": center_y}) # Example: Press a key to test input actions.append({"type": "press_key", "key": "space"}) return actions def bannerlord_campaign_decision(state: GameState) -> list[dict]: """ Example decision function for Bannerlord campaign mode. This would be replaced by a vision-language model that: - Analyzes the screenshot - Decides on strategy - Returns specific actions """ actions = [] # Move mouse to a position (example) screen_w, screen_h = state.visual.screen_size actions.append({"type": "move_to", "x": int(screen_w * 0.5), "y": int(screen_h * 0.5)}) # Open party screen to check troops actions.append({"type": "press_key", "key": "p"}) return actions # ═══════════════════════════════════════════════════════════════════════════ # CLI ENTRYPOINT # ═══════════════════════════════════════════════════════════════════════════ async def main(): """ Test the Bannerlord harness with a single ODA loop iteration. Usage: python bannerlord_harness.py [--mock] """ import argparse parser = argparse.ArgumentParser( description="Bannerlord MCP Harness — Test the ODA loop" ) parser.add_argument( "--mock", action="store_true", help="Run in mock mode (no actual MCP servers)", ) parser.add_argument( "--hermes-ws", default=DEFAULT_HERMES_WS_URL, help=f"Hermes WebSocket URL (default: {DEFAULT_HERMES_WS_URL})", ) parser.add_argument( "--iterations", type=int, default=3, help="Number of ODA iterations (default: 3)", ) parser.add_argument( "--delay", type=float, default=1.0, help="Delay between iterations in seconds (default: 1.0)", ) args = parser.parse_args() # Create harness harness = BannerlordHarness( hermes_ws_url=args.hermes_ws, enable_mock=args.mock, ) try: # Initialize await harness.start() # Run ODA loop await harness.run_observe_decide_act_loop( decision_fn=simple_test_decision, max_iterations=args.iterations, iteration_delay=args.delay, ) # Demonstrate Bannerlord-specific actions log.info("\n--- Testing Bannerlord-specific actions ---") await harness.open_inventory() await asyncio.sleep(0.5) await harness.open_character() await asyncio.sleep(0.5) await harness.open_party() except KeyboardInterrupt: log.info("Interrupted by user") finally: # Cleanup await harness.stop() if __name__ == "__main__": asyncio.run(main())