Some checks failed
Deploy Nexus / deploy (push) Failing after 4s
Implements the Hermes observation/control path for local Bannerlord per GamePortal Protocol. ## New Components - nexus/bannerlord_harness.py (874 lines) - MCPClient for JSON-RPC communication with MCP servers - capture_state() → GameState with visual + Steam context - execute_action() → ActionResult for all input types - observe-decide-act loop with telemetry through Hermes WS - Bannerlord-specific actions (inventory, party, save/load) - Mock mode for testing without game running - mcp_servers/desktop_control_server.py (14KB) - 13 desktop automation tools via pyautogui - Screenshot, mouse, keyboard control - Headless environment support - mcp_servers/steam_info_server.py (18KB) - 6 Steam Web API tools - Mock mode without API key, live mode with STEAM_API_KEY - tests/test_bannerlord_harness.py (37 tests, all passing) - GameState/ActionResult validation - Mock mode action tests - ODA loop tests - GamePortal Protocol compliance tests - docs/BANNERLORD_HARNESS_PROOF.md - Architecture documentation - Proof of ODA loop execution - Telemetry flow diagrams - examples/harness_demo.py - Runnable demo showing full ODA loop ## Updates - portals.json: Bannerlord metadata per GAMEPORTAL_PROTOCOL.md - status: active, portal_type: game-world - app_id: 261550, window_title: 'Mount & Blade II: Bannerlord' - telemetry_source: hermes-harness:bannerlord ## Verification pytest tests/test_bannerlord_harness.py -v 37 passed, 2 skipped, 11 warnings Closes #722
875 lines
33 KiB
Python
875 lines
33 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Bannerlord MCP Harness — GamePortal Protocol Implementation
|
|
|
|
A harness for Mount & Blade II: Bannerlord using MCP (Model Context Protocol) servers:
|
|
- desktop-control MCP: screenshots, mouse/keyboard input
|
|
- steam-info MCP: game stats, achievements, player count
|
|
|
|
This harness implements the GamePortal Protocol:
|
|
capture_state() → GameState
|
|
execute_action(action) → ActionResult
|
|
|
|
The ODA (Observe-Decide-Act) loop connects perception to action through
|
|
Hermes WebSocket telemetry.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Optional
|
|
|
|
import websockets
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# CONFIGURATION
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
BANNERLORD_APP_ID = 261550
|
|
BANNERLORD_WINDOW_TITLE = "Mount & Blade II: Bannerlord"
|
|
DEFAULT_HERMES_WS_URL = "ws://localhost:8000/ws"
|
|
DEFAULT_MCP_DESKTOP_COMMAND = ["npx", "-y", "@modelcontextprotocol/server-desktop-control"]
|
|
DEFAULT_MCP_STEAM_COMMAND = ["npx", "-y", "@modelcontextprotocol/server-steam-info"]
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [bannerlord] %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
log = logging.getLogger("bannerlord")
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# MCP CLIENT — JSON-RPC over stdio
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
class MCPClient:
|
|
"""Client for MCP servers communicating over stdio."""
|
|
|
|
def __init__(self, name: str, command: list[str]):
|
|
self.name = name
|
|
self.command = command
|
|
self.process: Optional[subprocess.Popen] = None
|
|
self.request_id = 0
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def start(self) -> bool:
|
|
"""Start the MCP server process."""
|
|
try:
|
|
self.process = subprocess.Popen(
|
|
self.command,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
# Give it a moment to initialize
|
|
await asyncio.sleep(0.5)
|
|
if self.process.poll() is not None:
|
|
log.error(f"MCP server {self.name} exited immediately")
|
|
return False
|
|
log.info(f"MCP server {self.name} started (PID: {self.process.pid})")
|
|
return True
|
|
except Exception as e:
|
|
log.error(f"Failed to start MCP server {self.name}: {e}")
|
|
return False
|
|
|
|
def stop(self):
|
|
"""Stop the MCP server process."""
|
|
if self.process and self.process.poll() is None:
|
|
self.process.terminate()
|
|
try:
|
|
self.process.wait(timeout=2)
|
|
except subprocess.TimeoutExpired:
|
|
self.process.kill()
|
|
log.info(f"MCP server {self.name} stopped")
|
|
|
|
async def call_tool(self, tool_name: str, arguments: dict) -> dict:
|
|
"""Call an MCP tool and return the result."""
|
|
async with self._lock:
|
|
self.request_id += 1
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"id": self.request_id,
|
|
"method": "tools/call",
|
|
"params": {
|
|
"name": tool_name,
|
|
"arguments": arguments,
|
|
},
|
|
}
|
|
|
|
if not self.process or self.process.poll() is not None:
|
|
return {"error": "MCP server not running"}
|
|
|
|
try:
|
|
# Send request
|
|
request_line = json.dumps(request) + "\n"
|
|
self.process.stdin.write(request_line)
|
|
self.process.stdin.flush()
|
|
|
|
# Read response (with timeout)
|
|
response_line = await asyncio.wait_for(
|
|
asyncio.to_thread(self.process.stdout.readline),
|
|
timeout=10.0,
|
|
)
|
|
|
|
if not response_line:
|
|
return {"error": "Empty response from MCP server"}
|
|
|
|
response = json.loads(response_line)
|
|
return response.get("result", {}).get("content", [{}])[0].get("text", "")
|
|
|
|
except asyncio.TimeoutError:
|
|
return {"error": f"Timeout calling {tool_name}"}
|
|
except json.JSONDecodeError as e:
|
|
return {"error": f"Invalid JSON response: {e}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def list_tools(self) -> list[str]:
|
|
"""List available tools from the MCP server."""
|
|
async with self._lock:
|
|
self.request_id += 1
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"id": self.request_id,
|
|
"method": "tools/list",
|
|
}
|
|
|
|
try:
|
|
request_line = json.dumps(request) + "\n"
|
|
self.process.stdin.write(request_line)
|
|
self.process.stdin.flush()
|
|
|
|
response_line = await asyncio.wait_for(
|
|
asyncio.to_thread(self.process.stdout.readline),
|
|
timeout=5.0,
|
|
)
|
|
|
|
response = json.loads(response_line)
|
|
tools = response.get("result", {}).get("tools", [])
|
|
return [t.get("name", "unknown") for t in tools]
|
|
|
|
except Exception as e:
|
|
log.warning(f"Failed to list tools: {e}")
|
|
return []
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# GAME STATE DATA CLASSES
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
@dataclass
|
|
class VisualState:
|
|
"""Visual perception from the game."""
|
|
screenshot_path: Optional[str] = None
|
|
screen_size: tuple[int, int] = (1920, 1080)
|
|
mouse_position: tuple[int, int] = (0, 0)
|
|
window_found: bool = False
|
|
window_title: str = ""
|
|
|
|
|
|
@dataclass
|
|
class GameContext:
|
|
"""Game-specific context from Steam."""
|
|
app_id: int = BANNERLORD_APP_ID
|
|
playtime_hours: float = 0.0
|
|
achievements_unlocked: int = 0
|
|
achievements_total: int = 0
|
|
current_players_online: int = 0
|
|
game_name: str = "Mount & Blade II: Bannerlord"
|
|
is_running: bool = False
|
|
|
|
|
|
@dataclass
|
|
class GameState:
|
|
"""Complete game state per GamePortal Protocol."""
|
|
portal_id: str = "bannerlord"
|
|
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
|
|
visual: VisualState = field(default_factory=VisualState)
|
|
game_context: GameContext = field(default_factory=GameContext)
|
|
session_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"portal_id": self.portal_id,
|
|
"timestamp": self.timestamp,
|
|
"session_id": self.session_id,
|
|
"visual": {
|
|
"screenshot_path": self.visual.screenshot_path,
|
|
"screen_size": list(self.visual.screen_size),
|
|
"mouse_position": list(self.visual.mouse_position),
|
|
"window_found": self.visual.window_found,
|
|
"window_title": self.visual.window_title,
|
|
},
|
|
"game_context": {
|
|
"app_id": self.game_context.app_id,
|
|
"playtime_hours": self.game_context.playtime_hours,
|
|
"achievements_unlocked": self.game_context.achievements_unlocked,
|
|
"achievements_total": self.game_context.achievements_total,
|
|
"current_players_online": self.game_context.current_players_online,
|
|
"game_name": self.game_context.game_name,
|
|
"is_running": self.game_context.is_running,
|
|
},
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ActionResult:
|
|
"""Result of executing an action."""
|
|
success: bool = False
|
|
action: str = ""
|
|
params: dict = field(default_factory=dict)
|
|
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
result = {
|
|
"success": self.success,
|
|
"action": self.action,
|
|
"params": self.params,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
if self.error:
|
|
result["error"] = self.error
|
|
return result
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# BANNERLORD HARNESS — Main Implementation
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
class BannerlordHarness:
|
|
"""
|
|
Harness for Mount & Blade II: Bannerlord.
|
|
|
|
Implements the GamePortal Protocol:
|
|
- capture_state(): Takes screenshot, gets screen info, fetches Steam stats
|
|
- execute_action(): Translates actions to MCP tool calls
|
|
|
|
Telemetry flows through Hermes WebSocket for the ODA loop.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
hermes_ws_url: str = DEFAULT_HERMES_WS_URL,
|
|
desktop_command: Optional[list[str]] = None,
|
|
steam_command: Optional[list[str]] = None,
|
|
enable_mock: bool = False,
|
|
):
|
|
self.hermes_ws_url = hermes_ws_url
|
|
self.desktop_command = desktop_command or DEFAULT_MCP_DESKTOP_COMMAND
|
|
self.steam_command = steam_command or DEFAULT_MCP_STEAM_COMMAND
|
|
self.enable_mock = enable_mock
|
|
|
|
# MCP clients
|
|
self.desktop_mcp: Optional[MCPClient] = None
|
|
self.steam_mcp: Optional[MCPClient] = None
|
|
|
|
# WebSocket connection to Hermes
|
|
self.ws: Optional[websockets.WebSocketClientProtocol] = None
|
|
self.ws_connected = False
|
|
|
|
# State
|
|
self.session_id = str(uuid.uuid4())[:8]
|
|
self.cycle_count = 0
|
|
self.running = False
|
|
|
|
# ═══ LIFECYCLE ═══
|
|
|
|
async def start(self) -> bool:
|
|
"""Initialize MCP servers and WebSocket connection."""
|
|
log.info("=" * 50)
|
|
log.info("BANNERLORD HARNESS — INITIALIZING")
|
|
log.info(f" Session: {self.session_id}")
|
|
log.info(f" Hermes WS: {self.hermes_ws_url}")
|
|
log.info("=" * 50)
|
|
|
|
# Start MCP servers (or use mock mode)
|
|
if not self.enable_mock:
|
|
self.desktop_mcp = MCPClient("desktop-control", self.desktop_command)
|
|
self.steam_mcp = MCPClient("steam-info", self.steam_command)
|
|
|
|
desktop_ok = await self.desktop_mcp.start()
|
|
steam_ok = await self.steam_mcp.start()
|
|
|
|
if not desktop_ok:
|
|
log.warning("Desktop MCP failed to start, enabling mock mode")
|
|
self.enable_mock = True
|
|
|
|
if not steam_ok:
|
|
log.warning("Steam MCP failed to start, will use fallback stats")
|
|
else:
|
|
log.info("Running in MOCK mode — no actual MCP servers")
|
|
|
|
# Connect to Hermes WebSocket
|
|
await self._connect_hermes()
|
|
|
|
log.info("Harness initialized successfully")
|
|
return True
|
|
|
|
async def stop(self):
|
|
"""Shutdown MCP servers and disconnect."""
|
|
self.running = False
|
|
log.info("Shutting down harness...")
|
|
|
|
if self.desktop_mcp:
|
|
self.desktop_mcp.stop()
|
|
if self.steam_mcp:
|
|
self.steam_mcp.stop()
|
|
|
|
if self.ws:
|
|
await self.ws.close()
|
|
self.ws_connected = False
|
|
|
|
log.info("Harness shutdown complete")
|
|
|
|
async def _connect_hermes(self):
|
|
"""Connect to Hermes WebSocket for telemetry."""
|
|
try:
|
|
self.ws = await websockets.connect(self.hermes_ws_url)
|
|
self.ws_connected = True
|
|
log.info(f"Connected to Hermes: {self.hermes_ws_url}")
|
|
|
|
# Register as a harness
|
|
await self._send_telemetry({
|
|
"type": "harness_register",
|
|
"harness_id": "bannerlord",
|
|
"session_id": self.session_id,
|
|
"game": "Mount & Blade II: Bannerlord",
|
|
"app_id": BANNERLORD_APP_ID,
|
|
})
|
|
except Exception as e:
|
|
log.warning(f"Could not connect to Hermes: {e}")
|
|
self.ws_connected = False
|
|
|
|
async def _send_telemetry(self, data: dict):
|
|
"""Send telemetry data to Hermes WebSocket."""
|
|
if self.ws_connected and self.ws:
|
|
try:
|
|
await self.ws.send(json.dumps(data))
|
|
except Exception as e:
|
|
log.warning(f"Telemetry send failed: {e}")
|
|
self.ws_connected = False
|
|
|
|
# ═══ GAMEPORTAL PROTOCOL: capture_state() ═══
|
|
|
|
async def capture_state(self) -> GameState:
|
|
"""
|
|
Capture current game state.
|
|
|
|
Returns GameState with:
|
|
- Screenshot of Bannerlord window
|
|
- Screen dimensions and mouse position
|
|
- Steam stats (playtime, achievements, player count)
|
|
"""
|
|
state = GameState(session_id=self.session_id)
|
|
|
|
# Capture visual state via desktop-control MCP
|
|
visual = await self._capture_visual_state()
|
|
state.visual = visual
|
|
|
|
# Capture game context via steam-info MCP
|
|
context = await self._capture_game_context()
|
|
state.game_context = context
|
|
|
|
# Send telemetry
|
|
await self._send_telemetry({
|
|
"type": "game_state_captured",
|
|
"portal_id": "bannerlord",
|
|
"session_id": self.session_id,
|
|
"cycle": self.cycle_count,
|
|
"visual": {
|
|
"window_found": visual.window_found,
|
|
"screen_size": list(visual.screen_size),
|
|
},
|
|
"game_context": {
|
|
"is_running": context.is_running,
|
|
"playtime_hours": context.playtime_hours,
|
|
},
|
|
})
|
|
|
|
return state
|
|
|
|
async def _capture_visual_state(self) -> VisualState:
|
|
"""Capture visual state via desktop-control MCP."""
|
|
visual = VisualState()
|
|
|
|
if self.enable_mock or not self.desktop_mcp:
|
|
# Mock mode: simulate a screenshot
|
|
visual.screenshot_path = f"/tmp/bannerlord_mock_{int(time.time())}.png"
|
|
visual.screen_size = (1920, 1080)
|
|
visual.mouse_position = (960, 540)
|
|
visual.window_found = True
|
|
visual.window_title = BANNERLORD_WINDOW_TITLE
|
|
return visual
|
|
|
|
try:
|
|
# Get screen size
|
|
size_result = await self.desktop_mcp.call_tool("get_screen_size", {})
|
|
if isinstance(size_result, str):
|
|
# Parse "1920x1080" or similar
|
|
parts = size_result.lower().replace("x", " ").split()
|
|
if len(parts) >= 2:
|
|
visual.screen_size = (int(parts[0]), int(parts[1]))
|
|
|
|
# Get mouse position
|
|
mouse_result = await self.desktop_mcp.call_tool("get_mouse_position", {})
|
|
if isinstance(mouse_result, str):
|
|
# Parse "100, 200" or similar
|
|
parts = mouse_result.replace(",", " ").split()
|
|
if len(parts) >= 2:
|
|
visual.mouse_position = (int(parts[0]), int(parts[1]))
|
|
|
|
# Take screenshot
|
|
screenshot_path = f"/tmp/bannerlord_capture_{int(time.time())}.png"
|
|
screenshot_result = await self.desktop_mcp.call_tool(
|
|
"take_screenshot",
|
|
{"path": screenshot_path, "window_title": BANNERLORD_WINDOW_TITLE}
|
|
)
|
|
|
|
if screenshot_result and "error" not in str(screenshot_result):
|
|
visual.screenshot_path = screenshot_path
|
|
visual.window_found = True
|
|
visual.window_title = BANNERLORD_WINDOW_TITLE
|
|
else:
|
|
# Try generic screenshot
|
|
screenshot_result = await self.desktop_mcp.call_tool(
|
|
"take_screenshot",
|
|
{"path": screenshot_path}
|
|
)
|
|
if screenshot_result and "error" not in str(screenshot_result):
|
|
visual.screenshot_path = screenshot_path
|
|
visual.window_found = True
|
|
|
|
except Exception as e:
|
|
log.warning(f"Visual capture failed: {e}")
|
|
visual.window_found = False
|
|
|
|
return visual
|
|
|
|
async def _capture_game_context(self) -> GameContext:
|
|
"""Capture game context via steam-info MCP."""
|
|
context = GameContext()
|
|
|
|
if self.enable_mock or not self.steam_mcp:
|
|
# Mock mode: return simulated stats
|
|
context.playtime_hours = 142.5
|
|
context.achievements_unlocked = 23
|
|
context.achievements_total = 96
|
|
context.current_players_online = 8421
|
|
context.is_running = True
|
|
return context
|
|
|
|
try:
|
|
# Get current player count
|
|
players_result = await self.steam_mcp.call_tool(
|
|
"steam-current-players",
|
|
{"app_id": BANNERLORD_APP_ID}
|
|
)
|
|
if isinstance(players_result, (int, float)):
|
|
context.current_players_online = int(players_result)
|
|
elif isinstance(players_result, str):
|
|
# Try to extract number
|
|
digits = "".join(c for c in players_result if c.isdigit())
|
|
if digits:
|
|
context.current_players_online = int(digits)
|
|
|
|
# Get user stats (requires Steam user ID)
|
|
# For now, use placeholder stats
|
|
context.playtime_hours = 0.0
|
|
context.achievements_unlocked = 0
|
|
context.achievements_total = 0
|
|
|
|
except Exception as e:
|
|
log.warning(f"Game context capture failed: {e}")
|
|
|
|
return context
|
|
|
|
# ═══ GAMEPORTAL PROTOCOL: execute_action() ═══
|
|
|
|
async def execute_action(self, action: dict) -> ActionResult:
|
|
"""
|
|
Execute an action in the game.
|
|
|
|
Supported actions:
|
|
- click: { "type": "click", "x": int, "y": int }
|
|
- right_click: { "type": "right_click", "x": int, "y": int }
|
|
- double_click: { "type": "double_click", "x": int, "y": int }
|
|
- move_to: { "type": "move_to", "x": int, "y": int }
|
|
- drag_to: { "type": "drag_to", "x": int, "y": int, "duration": float }
|
|
- press_key: { "type": "press_key", "key": str }
|
|
- hotkey: { "type": "hotkey", "keys": str } # e.g., "ctrl shift s"
|
|
- type_text: { "type": "type_text", "text": str }
|
|
- scroll: { "type": "scroll", "amount": int }
|
|
|
|
Bannerlord-specific shortcuts:
|
|
- inventory: hotkey("i")
|
|
- character: hotkey("c")
|
|
- party: hotkey("p")
|
|
- save: hotkey("ctrl s")
|
|
- load: hotkey("ctrl l")
|
|
"""
|
|
action_type = action.get("type", "")
|
|
result = ActionResult(action=action_type, params=action)
|
|
|
|
if self.enable_mock or not self.desktop_mcp:
|
|
# Mock mode: log the action but don't execute
|
|
log.info(f"[MOCK] Action: {action_type} with params: {action}")
|
|
result.success = True
|
|
await self._send_telemetry({
|
|
"type": "action_executed",
|
|
"action": action_type,
|
|
"params": action,
|
|
"success": True,
|
|
"mock": True,
|
|
})
|
|
return result
|
|
|
|
try:
|
|
success = False
|
|
|
|
if action_type == "click":
|
|
success = await self._mcp_click(action.get("x", 0), action.get("y", 0))
|
|
elif action_type == "right_click":
|
|
success = await self._mcp_right_click(action.get("x", 0), action.get("y", 0))
|
|
elif action_type == "double_click":
|
|
success = await self._mcp_double_click(action.get("x", 0), action.get("y", 0))
|
|
elif action_type == "move_to":
|
|
success = await self._mcp_move_to(action.get("x", 0), action.get("y", 0))
|
|
elif action_type == "drag_to":
|
|
success = await self._mcp_drag_to(
|
|
action.get("x", 0),
|
|
action.get("y", 0),
|
|
action.get("duration", 0.5)
|
|
)
|
|
elif action_type == "press_key":
|
|
success = await self._mcp_press_key(action.get("key", ""))
|
|
elif action_type == "hotkey":
|
|
success = await self._mcp_hotkey(action.get("keys", ""))
|
|
elif action_type == "type_text":
|
|
success = await self._mcp_type_text(action.get("text", ""))
|
|
elif action_type == "scroll":
|
|
success = await self._mcp_scroll(action.get("amount", 0))
|
|
else:
|
|
result.error = f"Unknown action type: {action_type}"
|
|
|
|
result.success = success
|
|
if not success and not result.error:
|
|
result.error = "MCP tool call failed"
|
|
|
|
except Exception as e:
|
|
result.success = False
|
|
result.error = str(e)
|
|
log.error(f"Action execution failed: {e}")
|
|
|
|
# Send telemetry
|
|
await self._send_telemetry({
|
|
"type": "action_executed",
|
|
"action": action_type,
|
|
"params": action,
|
|
"success": result.success,
|
|
"error": result.error,
|
|
})
|
|
|
|
return result
|
|
|
|
# ═══ MCP TOOL WRAPPERS ═══
|
|
|
|
async def _mcp_click(self, x: int, y: int) -> bool:
|
|
"""Execute click via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool("click", {"x": x, "y": y})
|
|
return "error" not in str(result).lower()
|
|
|
|
async def _mcp_right_click(self, x: int, y: int) -> bool:
|
|
"""Execute right-click via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool("right_click", {"x": x, "y": y})
|
|
return "error" not in str(result).lower()
|
|
|
|
async def _mcp_double_click(self, x: int, y: int) -> bool:
|
|
"""Execute double-click via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool("double_click", {"x": x, "y": y})
|
|
return "error" not in str(result).lower()
|
|
|
|
async def _mcp_move_to(self, x: int, y: int) -> bool:
|
|
"""Move mouse via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool("move_to", {"x": x, "y": y})
|
|
return "error" not in str(result).lower()
|
|
|
|
async def _mcp_drag_to(self, x: int, y: int, duration: float = 0.5) -> bool:
|
|
"""Drag mouse via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool(
|
|
"drag_to",
|
|
{"x": x, "y": y, "duration": duration}
|
|
)
|
|
return "error" not in str(result).lower()
|
|
|
|
async def _mcp_press_key(self, key: str) -> bool:
|
|
"""Press key via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool("press_key", {"key": key})
|
|
return "error" not in str(result).lower()
|
|
|
|
async def _mcp_hotkey(self, keys: str) -> bool:
|
|
"""Execute hotkey combo via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool("hotkey", {"keys": keys})
|
|
return "error" not in str(result).lower()
|
|
|
|
async def _mcp_type_text(self, text: str) -> bool:
|
|
"""Type text via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool("type_text", {"text": text})
|
|
return "error" not in str(result).lower()
|
|
|
|
async def _mcp_scroll(self, amount: int) -> bool:
|
|
"""Scroll via desktop-control MCP."""
|
|
result = await self.desktop_mcp.call_tool("scroll", {"amount": amount})
|
|
return "error" not in str(result).lower()
|
|
|
|
# ═══ BANNERLORD-SPECIFIC ACTIONS ═══
|
|
|
|
async def open_inventory(self) -> ActionResult:
|
|
"""Open inventory screen (I key)."""
|
|
return await self.execute_action({"type": "press_key", "key": "i"})
|
|
|
|
async def open_character(self) -> ActionResult:
|
|
"""Open character screen (C key)."""
|
|
return await self.execute_action({"type": "press_key", "key": "c"})
|
|
|
|
async def open_party(self) -> ActionResult:
|
|
"""Open party screen (P key)."""
|
|
return await self.execute_action({"type": "press_key", "key": "p"})
|
|
|
|
async def save_game(self) -> ActionResult:
|
|
"""Save game (Ctrl+S)."""
|
|
return await self.execute_action({"type": "hotkey", "keys": "ctrl s"})
|
|
|
|
async def load_game(self) -> ActionResult:
|
|
"""Load game (Ctrl+L)."""
|
|
return await self.execute_action({"type": "hotkey", "keys": "ctrl l"})
|
|
|
|
async def click_settlement(self, x: int, y: int) -> ActionResult:
|
|
"""Click on a settlement on the campaign map."""
|
|
return await self.execute_action({"type": "click", "x": x, "y": y})
|
|
|
|
async def move_army(self, x: int, y: int) -> ActionResult:
|
|
"""Right-click to move army on campaign map."""
|
|
return await self.execute_action({"type": "right_click", "x": x, "y": y})
|
|
|
|
async def select_unit(self, x: int, y: int) -> ActionResult:
|
|
"""Click to select a unit in battle."""
|
|
return await self.execute_action({"type": "click", "x": x, "y": y})
|
|
|
|
async def command_unit(self, x: int, y: int) -> ActionResult:
|
|
"""Right-click to command a unit in battle."""
|
|
return await self.execute_action({"type": "right_click", "x": x, "y": y})
|
|
|
|
# ═══ ODA LOOP (Observe-Decide-Act) ═══
|
|
|
|
async def run_observe_decide_act_loop(
|
|
self,
|
|
decision_fn: Callable[[GameState], list[dict]],
|
|
max_iterations: int = 10,
|
|
iteration_delay: float = 2.0,
|
|
):
|
|
"""
|
|
The core ODA loop — proves the harness works.
|
|
|
|
1. OBSERVE: Capture game state (screenshot, stats)
|
|
2. DECIDE: Call decision_fn(state) to get actions
|
|
3. ACT: Execute each action
|
|
4. REPEAT
|
|
|
|
Args:
|
|
decision_fn: Function that takes GameState and returns list of actions
|
|
max_iterations: Maximum number of ODA cycles
|
|
iteration_delay: Seconds to wait between cycles
|
|
"""
|
|
log.info("=" * 50)
|
|
log.info("STARTING ODA LOOP")
|
|
log.info(f" Max iterations: {max_iterations}")
|
|
log.info(f" Iteration delay: {iteration_delay}s")
|
|
log.info("=" * 50)
|
|
|
|
self.running = True
|
|
|
|
for iteration in range(max_iterations):
|
|
if not self.running:
|
|
break
|
|
|
|
self.cycle_count = iteration
|
|
log.info(f"\n--- ODA Cycle {iteration + 1}/{max_iterations} ---")
|
|
|
|
# 1. OBSERVE: Capture state
|
|
log.info("[OBSERVE] Capturing game state...")
|
|
state = await self.capture_state()
|
|
log.info(f" Screenshot: {state.visual.screenshot_path}")
|
|
log.info(f" Window found: {state.visual.window_found}")
|
|
log.info(f" Screen: {state.visual.screen_size}")
|
|
log.info(f" Players online: {state.game_context.current_players_online}")
|
|
|
|
# 2. DECIDE: Get actions from decision function
|
|
log.info("[DECIDE] Getting actions...")
|
|
actions = decision_fn(state)
|
|
log.info(f" Decision returned {len(actions)} actions")
|
|
|
|
# 3. ACT: Execute actions
|
|
log.info("[ACT] Executing actions...")
|
|
results = []
|
|
for i, action in enumerate(actions):
|
|
log.info(f" Action {i+1}/{len(actions)}: {action.get('type', 'unknown')}")
|
|
result = await self.execute_action(action)
|
|
results.append(result)
|
|
log.info(f" Result: {'SUCCESS' if result.success else 'FAILED'}")
|
|
if result.error:
|
|
log.info(f" Error: {result.error}")
|
|
|
|
# Send cycle summary telemetry
|
|
await self._send_telemetry({
|
|
"type": "oda_cycle_complete",
|
|
"cycle": iteration,
|
|
"actions_executed": len(actions),
|
|
"successful": sum(1 for r in results if r.success),
|
|
"failed": sum(1 for r in results if not r.success),
|
|
})
|
|
|
|
# Delay before next iteration
|
|
if iteration < max_iterations - 1:
|
|
await asyncio.sleep(iteration_delay)
|
|
|
|
log.info("\n" + "=" * 50)
|
|
log.info("ODA LOOP COMPLETE")
|
|
log.info(f"Total cycles: {self.cycle_count + 1}")
|
|
log.info("=" * 50)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# SIMPLE DECISION FUNCTIONS FOR TESTING
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
def simple_test_decision(state: GameState) -> list[dict]:
|
|
"""
|
|
A simple decision function for testing.
|
|
|
|
In a real implementation, this would:
|
|
1. Analyze the screenshot (vision model)
|
|
2. Consider game context
|
|
3. Return appropriate actions
|
|
"""
|
|
actions = []
|
|
|
|
# Example: If on campaign map, move mouse to center
|
|
if state.visual.window_found:
|
|
center_x = state.visual.screen_size[0] // 2
|
|
center_y = state.visual.screen_size[1] // 2
|
|
actions.append({"type": "move_to", "x": center_x, "y": center_y})
|
|
|
|
# Example: Press a key to test input
|
|
actions.append({"type": "press_key", "key": "space"})
|
|
|
|
return actions
|
|
|
|
|
|
def bannerlord_campaign_decision(state: GameState) -> list[dict]:
|
|
"""
|
|
Example decision function for Bannerlord campaign mode.
|
|
|
|
This would be replaced by a vision-language model that:
|
|
- Analyzes the screenshot
|
|
- Decides on strategy
|
|
- Returns specific actions
|
|
"""
|
|
actions = []
|
|
|
|
# Move mouse to a position (example)
|
|
screen_w, screen_h = state.visual.screen_size
|
|
actions.append({"type": "move_to", "x": int(screen_w * 0.5), "y": int(screen_h * 0.5)})
|
|
|
|
# Open party screen to check troops
|
|
actions.append({"type": "press_key", "key": "p"})
|
|
|
|
return actions
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# CLI ENTRYPOINT
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
async def main():
|
|
"""
|
|
Test the Bannerlord harness with a single ODA loop iteration.
|
|
|
|
Usage:
|
|
python bannerlord_harness.py [--mock]
|
|
"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Bannerlord MCP Harness — Test the ODA loop"
|
|
)
|
|
parser.add_argument(
|
|
"--mock",
|
|
action="store_true",
|
|
help="Run in mock mode (no actual MCP servers)",
|
|
)
|
|
parser.add_argument(
|
|
"--hermes-ws",
|
|
default=DEFAULT_HERMES_WS_URL,
|
|
help=f"Hermes WebSocket URL (default: {DEFAULT_HERMES_WS_URL})",
|
|
)
|
|
parser.add_argument(
|
|
"--iterations",
|
|
type=int,
|
|
default=3,
|
|
help="Number of ODA iterations (default: 3)",
|
|
)
|
|
parser.add_argument(
|
|
"--delay",
|
|
type=float,
|
|
default=1.0,
|
|
help="Delay between iterations in seconds (default: 1.0)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Create harness
|
|
harness = BannerlordHarness(
|
|
hermes_ws_url=args.hermes_ws,
|
|
enable_mock=args.mock,
|
|
)
|
|
|
|
try:
|
|
# Initialize
|
|
await harness.start()
|
|
|
|
# Run ODA loop
|
|
await harness.run_observe_decide_act_loop(
|
|
decision_fn=simple_test_decision,
|
|
max_iterations=args.iterations,
|
|
iteration_delay=args.delay,
|
|
)
|
|
|
|
# Demonstrate Bannerlord-specific actions
|
|
log.info("\n--- Testing Bannerlord-specific actions ---")
|
|
await harness.open_inventory()
|
|
await asyncio.sleep(0.5)
|
|
await harness.open_character()
|
|
await asyncio.sleep(0.5)
|
|
await harness.open_party()
|
|
|
|
except KeyboardInterrupt:
|
|
log.info("Interrupted by user")
|
|
finally:
|
|
# Cleanup
|
|
await harness.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|