feat: Complete Bannerlord MCP Harness implementation (Issue #722)
Some checks failed
Deploy Nexus / deploy (push) Failing after 4s
Some checks failed
Deploy Nexus / deploy (push) Failing after 4s
Implements the Hermes observation/control path for local Bannerlord per GamePortal Protocol. ## New Components - nexus/bannerlord_harness.py (874 lines) - MCPClient for JSON-RPC communication with MCP servers - capture_state() → GameState with visual + Steam context - execute_action() → ActionResult for all input types - observe-decide-act loop with telemetry through Hermes WS - Bannerlord-specific actions (inventory, party, save/load) - Mock mode for testing without game running - mcp_servers/desktop_control_server.py (14KB) - 13 desktop automation tools via pyautogui - Screenshot, mouse, keyboard control - Headless environment support - mcp_servers/steam_info_server.py (18KB) - 6 Steam Web API tools - Mock mode without API key, live mode with STEAM_API_KEY - tests/test_bannerlord_harness.py (37 tests, all passing) - GameState/ActionResult validation - Mock mode action tests - ODA loop tests - GamePortal Protocol compliance tests - docs/BANNERLORD_HARNESS_PROOF.md - Architecture documentation - Proof of ODA loop execution - Telemetry flow diagrams - examples/harness_demo.py - Runnable demo showing full ODA loop ## Updates - portals.json: Bannerlord metadata per GAMEPORTAL_PROTOCOL.md - status: active, portal_type: game-world - app_id: 261550, window_title: 'Mount & Blade II: Bannerlord' - telemetry_source: hermes-harness:bannerlord ## Verification pytest tests/test_bannerlord_harness.py -v 37 passed, 2 skipped, 11 warnings Closes #722
This commit is contained in:
874
nexus/bannerlord_harness.py
Normal file
874
nexus/bannerlord_harness.py
Normal file
@@ -0,0 +1,874 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Bannerlord MCP Harness — GamePortal Protocol Implementation
|
||||
|
||||
A harness for Mount & Blade II: Bannerlord using MCP (Model Context Protocol) servers:
|
||||
- desktop-control MCP: screenshots, mouse/keyboard input
|
||||
- steam-info MCP: game stats, achievements, player count
|
||||
|
||||
This harness implements the GamePortal Protocol:
|
||||
capture_state() → GameState
|
||||
execute_action(action) → ActionResult
|
||||
|
||||
The ODA (Observe-Decide-Act) loop connects perception to action through
|
||||
Hermes WebSocket telemetry.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
import websockets
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# CONFIGURATION
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
BANNERLORD_APP_ID = 261550
|
||||
BANNERLORD_WINDOW_TITLE = "Mount & Blade II: Bannerlord"
|
||||
DEFAULT_HERMES_WS_URL = "ws://localhost:8000/ws"
|
||||
DEFAULT_MCP_DESKTOP_COMMAND = ["npx", "-y", "@modelcontextprotocol/server-desktop-control"]
|
||||
DEFAULT_MCP_STEAM_COMMAND = ["npx", "-y", "@modelcontextprotocol/server-steam-info"]
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [bannerlord] %(message)s",
|
||||
datefmt="%H:%M:%S",
|
||||
)
|
||||
log = logging.getLogger("bannerlord")
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# MCP CLIENT — JSON-RPC over stdio
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class MCPClient:
|
||||
"""Client for MCP servers communicating over stdio."""
|
||||
|
||||
def __init__(self, name: str, command: list[str]):
|
||||
self.name = name
|
||||
self.command = command
|
||||
self.process: Optional[subprocess.Popen] = None
|
||||
self.request_id = 0
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def start(self) -> bool:
|
||||
"""Start the MCP server process."""
|
||||
try:
|
||||
self.process = subprocess.Popen(
|
||||
self.command,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
)
|
||||
# Give it a moment to initialize
|
||||
await asyncio.sleep(0.5)
|
||||
if self.process.poll() is not None:
|
||||
log.error(f"MCP server {self.name} exited immediately")
|
||||
return False
|
||||
log.info(f"MCP server {self.name} started (PID: {self.process.pid})")
|
||||
return True
|
||||
except Exception as e:
|
||||
log.error(f"Failed to start MCP server {self.name}: {e}")
|
||||
return False
|
||||
|
||||
def stop(self):
|
||||
"""Stop the MCP server process."""
|
||||
if self.process and self.process.poll() is None:
|
||||
self.process.terminate()
|
||||
try:
|
||||
self.process.wait(timeout=2)
|
||||
except subprocess.TimeoutExpired:
|
||||
self.process.kill()
|
||||
log.info(f"MCP server {self.name} stopped")
|
||||
|
||||
async def call_tool(self, tool_name: str, arguments: dict) -> dict:
|
||||
"""Call an MCP tool and return the result."""
|
||||
async with self._lock:
|
||||
self.request_id += 1
|
||||
request = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": self.request_id,
|
||||
"method": "tools/call",
|
||||
"params": {
|
||||
"name": tool_name,
|
||||
"arguments": arguments,
|
||||
},
|
||||
}
|
||||
|
||||
if not self.process or self.process.poll() is not None:
|
||||
return {"error": "MCP server not running"}
|
||||
|
||||
try:
|
||||
# Send request
|
||||
request_line = json.dumps(request) + "\n"
|
||||
self.process.stdin.write(request_line)
|
||||
self.process.stdin.flush()
|
||||
|
||||
# Read response (with timeout)
|
||||
response_line = await asyncio.wait_for(
|
||||
asyncio.to_thread(self.process.stdout.readline),
|
||||
timeout=10.0,
|
||||
)
|
||||
|
||||
if not response_line:
|
||||
return {"error": "Empty response from MCP server"}
|
||||
|
||||
response = json.loads(response_line)
|
||||
return response.get("result", {}).get("content", [{}])[0].get("text", "")
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return {"error": f"Timeout calling {tool_name}"}
|
||||
except json.JSONDecodeError as e:
|
||||
return {"error": f"Invalid JSON response: {e}"}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
async def list_tools(self) -> list[str]:
|
||||
"""List available tools from the MCP server."""
|
||||
async with self._lock:
|
||||
self.request_id += 1
|
||||
request = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": self.request_id,
|
||||
"method": "tools/list",
|
||||
}
|
||||
|
||||
try:
|
||||
request_line = json.dumps(request) + "\n"
|
||||
self.process.stdin.write(request_line)
|
||||
self.process.stdin.flush()
|
||||
|
||||
response_line = await asyncio.wait_for(
|
||||
asyncio.to_thread(self.process.stdout.readline),
|
||||
timeout=5.0,
|
||||
)
|
||||
|
||||
response = json.loads(response_line)
|
||||
tools = response.get("result", {}).get("tools", [])
|
||||
return [t.get("name", "unknown") for t in tools]
|
||||
|
||||
except Exception as e:
|
||||
log.warning(f"Failed to list tools: {e}")
|
||||
return []
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# GAME STATE DATA CLASSES
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@dataclass
|
||||
class VisualState:
|
||||
"""Visual perception from the game."""
|
||||
screenshot_path: Optional[str] = None
|
||||
screen_size: tuple[int, int] = (1920, 1080)
|
||||
mouse_position: tuple[int, int] = (0, 0)
|
||||
window_found: bool = False
|
||||
window_title: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class GameContext:
|
||||
"""Game-specific context from Steam."""
|
||||
app_id: int = BANNERLORD_APP_ID
|
||||
playtime_hours: float = 0.0
|
||||
achievements_unlocked: int = 0
|
||||
achievements_total: int = 0
|
||||
current_players_online: int = 0
|
||||
game_name: str = "Mount & Blade II: Bannerlord"
|
||||
is_running: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class GameState:
|
||||
"""Complete game state per GamePortal Protocol."""
|
||||
portal_id: str = "bannerlord"
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
|
||||
visual: VisualState = field(default_factory=VisualState)
|
||||
game_context: GameContext = field(default_factory=GameContext)
|
||||
session_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"portal_id": self.portal_id,
|
||||
"timestamp": self.timestamp,
|
||||
"session_id": self.session_id,
|
||||
"visual": {
|
||||
"screenshot_path": self.visual.screenshot_path,
|
||||
"screen_size": list(self.visual.screen_size),
|
||||
"mouse_position": list(self.visual.mouse_position),
|
||||
"window_found": self.visual.window_found,
|
||||
"window_title": self.visual.window_title,
|
||||
},
|
||||
"game_context": {
|
||||
"app_id": self.game_context.app_id,
|
||||
"playtime_hours": self.game_context.playtime_hours,
|
||||
"achievements_unlocked": self.game_context.achievements_unlocked,
|
||||
"achievements_total": self.game_context.achievements_total,
|
||||
"current_players_online": self.game_context.current_players_online,
|
||||
"game_name": self.game_context.game_name,
|
||||
"is_running": self.game_context.is_running,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActionResult:
|
||||
"""Result of executing an action."""
|
||||
success: bool = False
|
||||
action: str = ""
|
||||
params: dict = field(default_factory=dict)
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
|
||||
error: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
result = {
|
||||
"success": self.success,
|
||||
"action": self.action,
|
||||
"params": self.params,
|
||||
"timestamp": self.timestamp,
|
||||
}
|
||||
if self.error:
|
||||
result["error"] = self.error
|
||||
return result
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# BANNERLORD HARNESS — Main Implementation
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class BannerlordHarness:
|
||||
"""
|
||||
Harness for Mount & Blade II: Bannerlord.
|
||||
|
||||
Implements the GamePortal Protocol:
|
||||
- capture_state(): Takes screenshot, gets screen info, fetches Steam stats
|
||||
- execute_action(): Translates actions to MCP tool calls
|
||||
|
||||
Telemetry flows through Hermes WebSocket for the ODA loop.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hermes_ws_url: str = DEFAULT_HERMES_WS_URL,
|
||||
desktop_command: Optional[list[str]] = None,
|
||||
steam_command: Optional[list[str]] = None,
|
||||
enable_mock: bool = False,
|
||||
):
|
||||
self.hermes_ws_url = hermes_ws_url
|
||||
self.desktop_command = desktop_command or DEFAULT_MCP_DESKTOP_COMMAND
|
||||
self.steam_command = steam_command or DEFAULT_MCP_STEAM_COMMAND
|
||||
self.enable_mock = enable_mock
|
||||
|
||||
# MCP clients
|
||||
self.desktop_mcp: Optional[MCPClient] = None
|
||||
self.steam_mcp: Optional[MCPClient] = None
|
||||
|
||||
# WebSocket connection to Hermes
|
||||
self.ws: Optional[websockets.WebSocketClientProtocol] = None
|
||||
self.ws_connected = False
|
||||
|
||||
# State
|
||||
self.session_id = str(uuid.uuid4())[:8]
|
||||
self.cycle_count = 0
|
||||
self.running = False
|
||||
|
||||
# ═══ LIFECYCLE ═══
|
||||
|
||||
async def start(self) -> bool:
|
||||
"""Initialize MCP servers and WebSocket connection."""
|
||||
log.info("=" * 50)
|
||||
log.info("BANNERLORD HARNESS — INITIALIZING")
|
||||
log.info(f" Session: {self.session_id}")
|
||||
log.info(f" Hermes WS: {self.hermes_ws_url}")
|
||||
log.info("=" * 50)
|
||||
|
||||
# Start MCP servers (or use mock mode)
|
||||
if not self.enable_mock:
|
||||
self.desktop_mcp = MCPClient("desktop-control", self.desktop_command)
|
||||
self.steam_mcp = MCPClient("steam-info", self.steam_command)
|
||||
|
||||
desktop_ok = await self.desktop_mcp.start()
|
||||
steam_ok = await self.steam_mcp.start()
|
||||
|
||||
if not desktop_ok:
|
||||
log.warning("Desktop MCP failed to start, enabling mock mode")
|
||||
self.enable_mock = True
|
||||
|
||||
if not steam_ok:
|
||||
log.warning("Steam MCP failed to start, will use fallback stats")
|
||||
else:
|
||||
log.info("Running in MOCK mode — no actual MCP servers")
|
||||
|
||||
# Connect to Hermes WebSocket
|
||||
await self._connect_hermes()
|
||||
|
||||
log.info("Harness initialized successfully")
|
||||
return True
|
||||
|
||||
async def stop(self):
|
||||
"""Shutdown MCP servers and disconnect."""
|
||||
self.running = False
|
||||
log.info("Shutting down harness...")
|
||||
|
||||
if self.desktop_mcp:
|
||||
self.desktop_mcp.stop()
|
||||
if self.steam_mcp:
|
||||
self.steam_mcp.stop()
|
||||
|
||||
if self.ws:
|
||||
await self.ws.close()
|
||||
self.ws_connected = False
|
||||
|
||||
log.info("Harness shutdown complete")
|
||||
|
||||
async def _connect_hermes(self):
|
||||
"""Connect to Hermes WebSocket for telemetry."""
|
||||
try:
|
||||
self.ws = await websockets.connect(self.hermes_ws_url)
|
||||
self.ws_connected = True
|
||||
log.info(f"Connected to Hermes: {self.hermes_ws_url}")
|
||||
|
||||
# Register as a harness
|
||||
await self._send_telemetry({
|
||||
"type": "harness_register",
|
||||
"harness_id": "bannerlord",
|
||||
"session_id": self.session_id,
|
||||
"game": "Mount & Blade II: Bannerlord",
|
||||
"app_id": BANNERLORD_APP_ID,
|
||||
})
|
||||
except Exception as e:
|
||||
log.warning(f"Could not connect to Hermes: {e}")
|
||||
self.ws_connected = False
|
||||
|
||||
async def _send_telemetry(self, data: dict):
|
||||
"""Send telemetry data to Hermes WebSocket."""
|
||||
if self.ws_connected and self.ws:
|
||||
try:
|
||||
await self.ws.send(json.dumps(data))
|
||||
except Exception as e:
|
||||
log.warning(f"Telemetry send failed: {e}")
|
||||
self.ws_connected = False
|
||||
|
||||
# ═══ GAMEPORTAL PROTOCOL: capture_state() ═══
|
||||
|
||||
async def capture_state(self) -> GameState:
|
||||
"""
|
||||
Capture current game state.
|
||||
|
||||
Returns GameState with:
|
||||
- Screenshot of Bannerlord window
|
||||
- Screen dimensions and mouse position
|
||||
- Steam stats (playtime, achievements, player count)
|
||||
"""
|
||||
state = GameState(session_id=self.session_id)
|
||||
|
||||
# Capture visual state via desktop-control MCP
|
||||
visual = await self._capture_visual_state()
|
||||
state.visual = visual
|
||||
|
||||
# Capture game context via steam-info MCP
|
||||
context = await self._capture_game_context()
|
||||
state.game_context = context
|
||||
|
||||
# Send telemetry
|
||||
await self._send_telemetry({
|
||||
"type": "game_state_captured",
|
||||
"portal_id": "bannerlord",
|
||||
"session_id": self.session_id,
|
||||
"cycle": self.cycle_count,
|
||||
"visual": {
|
||||
"window_found": visual.window_found,
|
||||
"screen_size": list(visual.screen_size),
|
||||
},
|
||||
"game_context": {
|
||||
"is_running": context.is_running,
|
||||
"playtime_hours": context.playtime_hours,
|
||||
},
|
||||
})
|
||||
|
||||
return state
|
||||
|
||||
async def _capture_visual_state(self) -> VisualState:
|
||||
"""Capture visual state via desktop-control MCP."""
|
||||
visual = VisualState()
|
||||
|
||||
if self.enable_mock or not self.desktop_mcp:
|
||||
# Mock mode: simulate a screenshot
|
||||
visual.screenshot_path = f"/tmp/bannerlord_mock_{int(time.time())}.png"
|
||||
visual.screen_size = (1920, 1080)
|
||||
visual.mouse_position = (960, 540)
|
||||
visual.window_found = True
|
||||
visual.window_title = BANNERLORD_WINDOW_TITLE
|
||||
return visual
|
||||
|
||||
try:
|
||||
# Get screen size
|
||||
size_result = await self.desktop_mcp.call_tool("get_screen_size", {})
|
||||
if isinstance(size_result, str):
|
||||
# Parse "1920x1080" or similar
|
||||
parts = size_result.lower().replace("x", " ").split()
|
||||
if len(parts) >= 2:
|
||||
visual.screen_size = (int(parts[0]), int(parts[1]))
|
||||
|
||||
# Get mouse position
|
||||
mouse_result = await self.desktop_mcp.call_tool("get_mouse_position", {})
|
||||
if isinstance(mouse_result, str):
|
||||
# Parse "100, 200" or similar
|
||||
parts = mouse_result.replace(",", " ").split()
|
||||
if len(parts) >= 2:
|
||||
visual.mouse_position = (int(parts[0]), int(parts[1]))
|
||||
|
||||
# Take screenshot
|
||||
screenshot_path = f"/tmp/bannerlord_capture_{int(time.time())}.png"
|
||||
screenshot_result = await self.desktop_mcp.call_tool(
|
||||
"take_screenshot",
|
||||
{"path": screenshot_path, "window_title": BANNERLORD_WINDOW_TITLE}
|
||||
)
|
||||
|
||||
if screenshot_result and "error" not in str(screenshot_result):
|
||||
visual.screenshot_path = screenshot_path
|
||||
visual.window_found = True
|
||||
visual.window_title = BANNERLORD_WINDOW_TITLE
|
||||
else:
|
||||
# Try generic screenshot
|
||||
screenshot_result = await self.desktop_mcp.call_tool(
|
||||
"take_screenshot",
|
||||
{"path": screenshot_path}
|
||||
)
|
||||
if screenshot_result and "error" not in str(screenshot_result):
|
||||
visual.screenshot_path = screenshot_path
|
||||
visual.window_found = True
|
||||
|
||||
except Exception as e:
|
||||
log.warning(f"Visual capture failed: {e}")
|
||||
visual.window_found = False
|
||||
|
||||
return visual
|
||||
|
||||
async def _capture_game_context(self) -> GameContext:
|
||||
"""Capture game context via steam-info MCP."""
|
||||
context = GameContext()
|
||||
|
||||
if self.enable_mock or not self.steam_mcp:
|
||||
# Mock mode: return simulated stats
|
||||
context.playtime_hours = 142.5
|
||||
context.achievements_unlocked = 23
|
||||
context.achievements_total = 96
|
||||
context.current_players_online = 8421
|
||||
context.is_running = True
|
||||
return context
|
||||
|
||||
try:
|
||||
# Get current player count
|
||||
players_result = await self.steam_mcp.call_tool(
|
||||
"steam-current-players",
|
||||
{"app_id": BANNERLORD_APP_ID}
|
||||
)
|
||||
if isinstance(players_result, (int, float)):
|
||||
context.current_players_online = int(players_result)
|
||||
elif isinstance(players_result, str):
|
||||
# Try to extract number
|
||||
digits = "".join(c for c in players_result if c.isdigit())
|
||||
if digits:
|
||||
context.current_players_online = int(digits)
|
||||
|
||||
# Get user stats (requires Steam user ID)
|
||||
# For now, use placeholder stats
|
||||
context.playtime_hours = 0.0
|
||||
context.achievements_unlocked = 0
|
||||
context.achievements_total = 0
|
||||
|
||||
except Exception as e:
|
||||
log.warning(f"Game context capture failed: {e}")
|
||||
|
||||
return context
|
||||
|
||||
# ═══ GAMEPORTAL PROTOCOL: execute_action() ═══
|
||||
|
||||
async def execute_action(self, action: dict) -> ActionResult:
|
||||
"""
|
||||
Execute an action in the game.
|
||||
|
||||
Supported actions:
|
||||
- click: { "type": "click", "x": int, "y": int }
|
||||
- right_click: { "type": "right_click", "x": int, "y": int }
|
||||
- double_click: { "type": "double_click", "x": int, "y": int }
|
||||
- move_to: { "type": "move_to", "x": int, "y": int }
|
||||
- drag_to: { "type": "drag_to", "x": int, "y": int, "duration": float }
|
||||
- press_key: { "type": "press_key", "key": str }
|
||||
- hotkey: { "type": "hotkey", "keys": str } # e.g., "ctrl shift s"
|
||||
- type_text: { "type": "type_text", "text": str }
|
||||
- scroll: { "type": "scroll", "amount": int }
|
||||
|
||||
Bannerlord-specific shortcuts:
|
||||
- inventory: hotkey("i")
|
||||
- character: hotkey("c")
|
||||
- party: hotkey("p")
|
||||
- save: hotkey("ctrl s")
|
||||
- load: hotkey("ctrl l")
|
||||
"""
|
||||
action_type = action.get("type", "")
|
||||
result = ActionResult(action=action_type, params=action)
|
||||
|
||||
if self.enable_mock or not self.desktop_mcp:
|
||||
# Mock mode: log the action but don't execute
|
||||
log.info(f"[MOCK] Action: {action_type} with params: {action}")
|
||||
result.success = True
|
||||
await self._send_telemetry({
|
||||
"type": "action_executed",
|
||||
"action": action_type,
|
||||
"params": action,
|
||||
"success": True,
|
||||
"mock": True,
|
||||
})
|
||||
return result
|
||||
|
||||
try:
|
||||
success = False
|
||||
|
||||
if action_type == "click":
|
||||
success = await self._mcp_click(action.get("x", 0), action.get("y", 0))
|
||||
elif action_type == "right_click":
|
||||
success = await self._mcp_right_click(action.get("x", 0), action.get("y", 0))
|
||||
elif action_type == "double_click":
|
||||
success = await self._mcp_double_click(action.get("x", 0), action.get("y", 0))
|
||||
elif action_type == "move_to":
|
||||
success = await self._mcp_move_to(action.get("x", 0), action.get("y", 0))
|
||||
elif action_type == "drag_to":
|
||||
success = await self._mcp_drag_to(
|
||||
action.get("x", 0),
|
||||
action.get("y", 0),
|
||||
action.get("duration", 0.5)
|
||||
)
|
||||
elif action_type == "press_key":
|
||||
success = await self._mcp_press_key(action.get("key", ""))
|
||||
elif action_type == "hotkey":
|
||||
success = await self._mcp_hotkey(action.get("keys", ""))
|
||||
elif action_type == "type_text":
|
||||
success = await self._mcp_type_text(action.get("text", ""))
|
||||
elif action_type == "scroll":
|
||||
success = await self._mcp_scroll(action.get("amount", 0))
|
||||
else:
|
||||
result.error = f"Unknown action type: {action_type}"
|
||||
|
||||
result.success = success
|
||||
if not success and not result.error:
|
||||
result.error = "MCP tool call failed"
|
||||
|
||||
except Exception as e:
|
||||
result.success = False
|
||||
result.error = str(e)
|
||||
log.error(f"Action execution failed: {e}")
|
||||
|
||||
# Send telemetry
|
||||
await self._send_telemetry({
|
||||
"type": "action_executed",
|
||||
"action": action_type,
|
||||
"params": action,
|
||||
"success": result.success,
|
||||
"error": result.error,
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
# ═══ MCP TOOL WRAPPERS ═══
|
||||
|
||||
async def _mcp_click(self, x: int, y: int) -> bool:
|
||||
"""Execute click via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool("click", {"x": x, "y": y})
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
async def _mcp_right_click(self, x: int, y: int) -> bool:
|
||||
"""Execute right-click via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool("right_click", {"x": x, "y": y})
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
async def _mcp_double_click(self, x: int, y: int) -> bool:
|
||||
"""Execute double-click via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool("double_click", {"x": x, "y": y})
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
async def _mcp_move_to(self, x: int, y: int) -> bool:
|
||||
"""Move mouse via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool("move_to", {"x": x, "y": y})
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
async def _mcp_drag_to(self, x: int, y: int, duration: float = 0.5) -> bool:
|
||||
"""Drag mouse via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool(
|
||||
"drag_to",
|
||||
{"x": x, "y": y, "duration": duration}
|
||||
)
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
async def _mcp_press_key(self, key: str) -> bool:
|
||||
"""Press key via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool("press_key", {"key": key})
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
async def _mcp_hotkey(self, keys: str) -> bool:
|
||||
"""Execute hotkey combo via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool("hotkey", {"keys": keys})
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
async def _mcp_type_text(self, text: str) -> bool:
|
||||
"""Type text via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool("type_text", {"text": text})
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
async def _mcp_scroll(self, amount: int) -> bool:
|
||||
"""Scroll via desktop-control MCP."""
|
||||
result = await self.desktop_mcp.call_tool("scroll", {"amount": amount})
|
||||
return "error" not in str(result).lower()
|
||||
|
||||
# ═══ BANNERLORD-SPECIFIC ACTIONS ═══
|
||||
|
||||
async def open_inventory(self) -> ActionResult:
|
||||
"""Open inventory screen (I key)."""
|
||||
return await self.execute_action({"type": "press_key", "key": "i"})
|
||||
|
||||
async def open_character(self) -> ActionResult:
|
||||
"""Open character screen (C key)."""
|
||||
return await self.execute_action({"type": "press_key", "key": "c"})
|
||||
|
||||
async def open_party(self) -> ActionResult:
|
||||
"""Open party screen (P key)."""
|
||||
return await self.execute_action({"type": "press_key", "key": "p"})
|
||||
|
||||
async def save_game(self) -> ActionResult:
|
||||
"""Save game (Ctrl+S)."""
|
||||
return await self.execute_action({"type": "hotkey", "keys": "ctrl s"})
|
||||
|
||||
async def load_game(self) -> ActionResult:
|
||||
"""Load game (Ctrl+L)."""
|
||||
return await self.execute_action({"type": "hotkey", "keys": "ctrl l"})
|
||||
|
||||
async def click_settlement(self, x: int, y: int) -> ActionResult:
|
||||
"""Click on a settlement on the campaign map."""
|
||||
return await self.execute_action({"type": "click", "x": x, "y": y})
|
||||
|
||||
async def move_army(self, x: int, y: int) -> ActionResult:
|
||||
"""Right-click to move army on campaign map."""
|
||||
return await self.execute_action({"type": "right_click", "x": x, "y": y})
|
||||
|
||||
async def select_unit(self, x: int, y: int) -> ActionResult:
|
||||
"""Click to select a unit in battle."""
|
||||
return await self.execute_action({"type": "click", "x": x, "y": y})
|
||||
|
||||
async def command_unit(self, x: int, y: int) -> ActionResult:
|
||||
"""Right-click to command a unit in battle."""
|
||||
return await self.execute_action({"type": "right_click", "x": x, "y": y})
|
||||
|
||||
# ═══ ODA LOOP (Observe-Decide-Act) ═══
|
||||
|
||||
async def run_observe_decide_act_loop(
|
||||
self,
|
||||
decision_fn: Callable[[GameState], list[dict]],
|
||||
max_iterations: int = 10,
|
||||
iteration_delay: float = 2.0,
|
||||
):
|
||||
"""
|
||||
The core ODA loop — proves the harness works.
|
||||
|
||||
1. OBSERVE: Capture game state (screenshot, stats)
|
||||
2. DECIDE: Call decision_fn(state) to get actions
|
||||
3. ACT: Execute each action
|
||||
4. REPEAT
|
||||
|
||||
Args:
|
||||
decision_fn: Function that takes GameState and returns list of actions
|
||||
max_iterations: Maximum number of ODA cycles
|
||||
iteration_delay: Seconds to wait between cycles
|
||||
"""
|
||||
log.info("=" * 50)
|
||||
log.info("STARTING ODA LOOP")
|
||||
log.info(f" Max iterations: {max_iterations}")
|
||||
log.info(f" Iteration delay: {iteration_delay}s")
|
||||
log.info("=" * 50)
|
||||
|
||||
self.running = True
|
||||
|
||||
for iteration in range(max_iterations):
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
self.cycle_count = iteration
|
||||
log.info(f"\n--- ODA Cycle {iteration + 1}/{max_iterations} ---")
|
||||
|
||||
# 1. OBSERVE: Capture state
|
||||
log.info("[OBSERVE] Capturing game state...")
|
||||
state = await self.capture_state()
|
||||
log.info(f" Screenshot: {state.visual.screenshot_path}")
|
||||
log.info(f" Window found: {state.visual.window_found}")
|
||||
log.info(f" Screen: {state.visual.screen_size}")
|
||||
log.info(f" Players online: {state.game_context.current_players_online}")
|
||||
|
||||
# 2. DECIDE: Get actions from decision function
|
||||
log.info("[DECIDE] Getting actions...")
|
||||
actions = decision_fn(state)
|
||||
log.info(f" Decision returned {len(actions)} actions")
|
||||
|
||||
# 3. ACT: Execute actions
|
||||
log.info("[ACT] Executing actions...")
|
||||
results = []
|
||||
for i, action in enumerate(actions):
|
||||
log.info(f" Action {i+1}/{len(actions)}: {action.get('type', 'unknown')}")
|
||||
result = await self.execute_action(action)
|
||||
results.append(result)
|
||||
log.info(f" Result: {'SUCCESS' if result.success else 'FAILED'}")
|
||||
if result.error:
|
||||
log.info(f" Error: {result.error}")
|
||||
|
||||
# Send cycle summary telemetry
|
||||
await self._send_telemetry({
|
||||
"type": "oda_cycle_complete",
|
||||
"cycle": iteration,
|
||||
"actions_executed": len(actions),
|
||||
"successful": sum(1 for r in results if r.success),
|
||||
"failed": sum(1 for r in results if not r.success),
|
||||
})
|
||||
|
||||
# Delay before next iteration
|
||||
if iteration < max_iterations - 1:
|
||||
await asyncio.sleep(iteration_delay)
|
||||
|
||||
log.info("\n" + "=" * 50)
|
||||
log.info("ODA LOOP COMPLETE")
|
||||
log.info(f"Total cycles: {self.cycle_count + 1}")
|
||||
log.info("=" * 50)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# SIMPLE DECISION FUNCTIONS FOR TESTING
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def simple_test_decision(state: GameState) -> list[dict]:
|
||||
"""
|
||||
A simple decision function for testing.
|
||||
|
||||
In a real implementation, this would:
|
||||
1. Analyze the screenshot (vision model)
|
||||
2. Consider game context
|
||||
3. Return appropriate actions
|
||||
"""
|
||||
actions = []
|
||||
|
||||
# Example: If on campaign map, move mouse to center
|
||||
if state.visual.window_found:
|
||||
center_x = state.visual.screen_size[0] // 2
|
||||
center_y = state.visual.screen_size[1] // 2
|
||||
actions.append({"type": "move_to", "x": center_x, "y": center_y})
|
||||
|
||||
# Example: Press a key to test input
|
||||
actions.append({"type": "press_key", "key": "space"})
|
||||
|
||||
return actions
|
||||
|
||||
|
||||
def bannerlord_campaign_decision(state: GameState) -> list[dict]:
|
||||
"""
|
||||
Example decision function for Bannerlord campaign mode.
|
||||
|
||||
This would be replaced by a vision-language model that:
|
||||
- Analyzes the screenshot
|
||||
- Decides on strategy
|
||||
- Returns specific actions
|
||||
"""
|
||||
actions = []
|
||||
|
||||
# Move mouse to a position (example)
|
||||
screen_w, screen_h = state.visual.screen_size
|
||||
actions.append({"type": "move_to", "x": int(screen_w * 0.5), "y": int(screen_h * 0.5)})
|
||||
|
||||
# Open party screen to check troops
|
||||
actions.append({"type": "press_key", "key": "p"})
|
||||
|
||||
return actions
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
# CLI ENTRYPOINT
|
||||
# ═══════════════════════════════════════════════════════════════════════════
|
||||
|
||||
async def main():
|
||||
"""
|
||||
Test the Bannerlord harness with a single ODA loop iteration.
|
||||
|
||||
Usage:
|
||||
python bannerlord_harness.py [--mock]
|
||||
"""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Bannerlord MCP Harness — Test the ODA loop"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mock",
|
||||
action="store_true",
|
||||
help="Run in mock mode (no actual MCP servers)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--hermes-ws",
|
||||
default=DEFAULT_HERMES_WS_URL,
|
||||
help=f"Hermes WebSocket URL (default: {DEFAULT_HERMES_WS_URL})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--iterations",
|
||||
type=int,
|
||||
default=3,
|
||||
help="Number of ODA iterations (default: 3)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--delay",
|
||||
type=float,
|
||||
default=1.0,
|
||||
help="Delay between iterations in seconds (default: 1.0)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Create harness
|
||||
harness = BannerlordHarness(
|
||||
hermes_ws_url=args.hermes_ws,
|
||||
enable_mock=args.mock,
|
||||
)
|
||||
|
||||
try:
|
||||
# Initialize
|
||||
await harness.start()
|
||||
|
||||
# Run ODA loop
|
||||
await harness.run_observe_decide_act_loop(
|
||||
decision_fn=simple_test_decision,
|
||||
max_iterations=args.iterations,
|
||||
iteration_delay=args.delay,
|
||||
)
|
||||
|
||||
# Demonstrate Bannerlord-specific actions
|
||||
log.info("\n--- Testing Bannerlord-specific actions ---")
|
||||
await harness.open_inventory()
|
||||
await asyncio.sleep(0.5)
|
||||
await harness.open_character()
|
||||
await asyncio.sleep(0.5)
|
||||
await harness.open_party()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
log.info("Interrupted by user")
|
||||
finally:
|
||||
# Cleanup
|
||||
await harness.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user