Files
the-nexus/mcp_servers/desktop_control_server.py

413 lines
14 KiB
Python
Raw Permalink Normal View History

feat: Complete Bannerlord MCP Harness implementation (Issue #722) Implements the Hermes observation/control path for local Bannerlord per GamePortal Protocol. ## New Components - nexus/bannerlord_harness.py (874 lines) - MCPClient for JSON-RPC communication with MCP servers - capture_state() → GameState with visual + Steam context - execute_action() → ActionResult for all input types - observe-decide-act loop with telemetry through Hermes WS - Bannerlord-specific actions (inventory, party, save/load) - Mock mode for testing without game running - mcp_servers/desktop_control_server.py (14KB) - 13 desktop automation tools via pyautogui - Screenshot, mouse, keyboard control - Headless environment support - mcp_servers/steam_info_server.py (18KB) - 6 Steam Web API tools - Mock mode without API key, live mode with STEAM_API_KEY - tests/test_bannerlord_harness.py (37 tests, all passing) - GameState/ActionResult validation - Mock mode action tests - ODA loop tests - GamePortal Protocol compliance tests - docs/BANNERLORD_HARNESS_PROOF.md - Architecture documentation - Proof of ODA loop execution - Telemetry flow diagrams - examples/harness_demo.py - Runnable demo showing full ODA loop ## Updates - portals.json: Bannerlord metadata per GAMEPORTAL_PROTOCOL.md - status: active, portal_type: game-world - app_id: 261550, window_title: 'Mount & Blade II: Bannerlord' - telemetry_source: hermes-harness:bannerlord ## Verification pytest tests/test_bannerlord_harness.py -v 37 passed, 2 skipped, 11 warnings Closes #722
2026-03-31 04:53:29 +00:00
#!/usr/bin/env python3
"""
MCP Server for Desktop Control
Provides screen capture, mouse, and keyboard control via pyautogui.
Uses stdio JSON-RPC for MCP protocol.
"""
import json
import sys
import logging
import os
from typing import Any, Dict, List, Optional
# Set up logging to stderr (stdout is for JSON-RPC)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger('desktop-control-mcp')
# Import pyautogui for desktop control
try:
import pyautogui
# Configure pyautogui for safety
pyautogui.FAILSAFE = True
pyautogui.PAUSE = 0.1
PYAUTOGUI_AVAILABLE = True
except ImportError:
logger.error("pyautogui not available - desktop control will be limited")
PYAUTOGUI_AVAILABLE = False
except Exception as e:
# Handle headless environments and other display-related errors
logger.warning(f"pyautogui import failed (likely headless environment): {e}")
PYAUTOGUI_AVAILABLE = False
class DesktopControlMCPServer:
"""MCP Server providing desktop control capabilities."""
def __init__(self):
self.tools = self._define_tools()
def _define_tools(self) -> List[Dict[str, Any]]:
"""Define the available tools for this MCP server."""
return [
{
"name": "take_screenshot",
"description": "Capture a screenshot and save it to the specified path",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File path to save the screenshot"
}
},
"required": ["path"]
}
},
{
"name": "get_screen_size",
"description": "Get the current screen dimensions",
"inputSchema": {
"type": "object",
"properties": {}
}
},
{
"name": "get_mouse_position",
"description": "Get the current mouse cursor position",
"inputSchema": {
"type": "object",
"properties": {}
}
},
{
"name": "pixel_color",
"description": "Get the RGB color of a pixel at the specified coordinates",
"inputSchema": {
"type": "object",
"properties": {
"x": {"type": "integer", "description": "X coordinate"},
"y": {"type": "integer", "description": "Y coordinate"}
},
"required": ["x", "y"]
}
},
{
"name": "click",
"description": "Perform a left mouse click at the specified coordinates",
"inputSchema": {
"type": "object",
"properties": {
"x": {"type": "integer", "description": "X coordinate"},
"y": {"type": "integer", "description": "Y coordinate"}
},
"required": ["x", "y"]
}
},
{
"name": "right_click",
"description": "Perform a right mouse click at the specified coordinates",
"inputSchema": {
"type": "object",
"properties": {
"x": {"type": "integer", "description": "X coordinate"},
"y": {"type": "integer", "description": "Y coordinate"}
},
"required": ["x", "y"]
}
},
{
"name": "move_to",
"description": "Move the mouse cursor to the specified coordinates",
"inputSchema": {
"type": "object",
"properties": {
"x": {"type": "integer", "description": "X coordinate"},
"y": {"type": "integer", "description": "Y coordinate"}
},
"required": ["x", "y"]
}
},
{
"name": "drag_to",
"description": "Drag the mouse to the specified coordinates with optional duration",
"inputSchema": {
"type": "object",
"properties": {
"x": {"type": "integer", "description": "X coordinate"},
"y": {"type": "integer", "description": "Y coordinate"},
"duration": {"type": "number", "description": "Duration of drag in seconds", "default": 0.5}
},
"required": ["x", "y"]
}
},
{
"name": "type_text",
"description": "Type the specified text string",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to type"}
},
"required": ["text"]
}
},
{
"name": "press_key",
"description": "Press a single key",
"inputSchema": {
"type": "object",
"properties": {
"key": {"type": "string", "description": "Key to press (e.g., 'enter', 'space', 'a', 'f1')"}
},
"required": ["key"]
}
},
{
"name": "hotkey",
"description": "Press a key combination (space-separated keys)",
"inputSchema": {
"type": "object",
"properties": {
"keys": {"type": "string", "description": "Space-separated keys (e.g., 'ctrl alt t')"}
},
"required": ["keys"]
}
},
{
"name": "scroll",
"description": "Scroll the mouse wheel",
"inputSchema": {
"type": "object",
"properties": {
"amount": {"type": "integer", "description": "Amount to scroll (positive for up, negative for down)"}
},
"required": ["amount"]
}
},
{
"name": "get_os",
"description": "Get information about the operating system",
"inputSchema": {
"type": "object",
"properties": {}
}
}
]
def handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle the initialize request."""
logger.info("Received initialize request")
return {
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": "desktop-control-mcp",
"version": "1.0.0"
},
"capabilities": {
"tools": {}
}
}
def handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle the tools/list request."""
return {"tools": self.tools}
def handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle the tools/call request."""
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
logger.info(f"Tool call: {tool_name} with args: {arguments}")
if not PYAUTOGUI_AVAILABLE and tool_name != "get_os":
return {
"content": [
{
"type": "text",
"text": json.dumps({"error": "pyautogui not available"})
}
],
"isError": True
}
try:
result = self._execute_tool(tool_name, arguments)
return {
"content": [
{
"type": "text",
"text": json.dumps(result)
}
],
"isError": False
}
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
return {
"content": [
{
"type": "text",
"text": json.dumps({"error": str(e)})
}
],
"isError": True
}
def _execute_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the specified tool with the given arguments."""
if name == "take_screenshot":
path = args.get("path", "screenshot.png")
screenshot = pyautogui.screenshot()
screenshot.save(path)
return {"success": True, "path": path}
elif name == "get_screen_size":
width, height = pyautogui.size()
return {"width": width, "height": height}
elif name == "get_mouse_position":
x, y = pyautogui.position()
return {"x": x, "y": y}
elif name == "pixel_color":
x = args.get("x", 0)
y = args.get("y", 0)
color = pyautogui.pixel(x, y)
return {"r": color[0], "g": color[1], "b": color[2], "rgb": list(color)}
elif name == "click":
x = args.get("x")
y = args.get("y")
pyautogui.click(x, y)
return {"success": True, "x": x, "y": y}
elif name == "right_click":
x = args.get("x")
y = args.get("y")
pyautogui.rightClick(x, y)
return {"success": True, "x": x, "y": y}
elif name == "move_to":
x = args.get("x")
y = args.get("y")
pyautogui.moveTo(x, y)
return {"success": True, "x": x, "y": y}
elif name == "drag_to":
x = args.get("x")
y = args.get("y")
duration = args.get("duration", 0.5)
pyautogui.dragTo(x, y, duration=duration)
return {"success": True, "x": x, "y": y, "duration": duration}
elif name == "type_text":
text = args.get("text", "")
pyautogui.typewrite(text)
return {"success": True, "text": text}
elif name == "press_key":
key = args.get("key", "")
pyautogui.press(key)
return {"success": True, "key": key}
elif name == "hotkey":
keys_str = args.get("keys", "")
keys = keys_str.split()
pyautogui.hotkey(*keys)
return {"success": True, "keys": keys}
elif name == "scroll":
amount = args.get("amount", 0)
pyautogui.scroll(amount)
return {"success": True, "amount": amount}
elif name == "get_os":
import platform
return {
"system": platform.system(),
"release": platform.release(),
"version": platform.version(),
"machine": platform.machine(),
"processor": platform.processor(),
"platform": platform.platform()
}
else:
raise ValueError(f"Unknown tool: {name}")
def process_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Process an MCP request and return the response."""
method = request.get("method", "")
params = request.get("params", {})
req_id = request.get("id")
if method == "initialize":
result = self.handle_initialize(params)
elif method == "tools/list":
result = self.handle_tools_list(params)
elif method == "tools/call":
result = self.handle_tools_call(params)
else:
# Unknown method
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
return {
"jsonrpc": "2.0",
"id": req_id,
"result": result
}
def main():
"""Main entry point for the MCP server."""
logger.info("Desktop Control MCP Server starting...")
server = DesktopControlMCPServer()
# Check if running in a TTY (for testing)
if sys.stdin.isatty():
logger.info("Running in interactive mode (for testing)")
print("Desktop Control MCP Server", file=sys.stderr)
print("Enter JSON-RPC requests (one per line):", file=sys.stderr)
try:
while True:
# Read line from stdin
line = sys.stdin.readline()
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = server.process_request(request)
if response:
print(json.dumps(response), flush=True)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
}
print(json.dumps(error_response), flush=True)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, shutting down...")
except Exception as e:
logger.error(f"Unexpected error: {e}")
logger.info("Desktop Control MCP Server stopped.")
if __name__ == "__main__":
main()