#!/usr/bin/env python3 """ MCP Server for Desktop Control Provides screen capture, mouse, and keyboard control via pyautogui. Uses stdio JSON-RPC for MCP protocol. """ import json import sys import logging import os from typing import Any, Dict, List, Optional # Set up logging to stderr (stdout is for JSON-RPC) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stderr ) logger = logging.getLogger('desktop-control-mcp') # Import pyautogui for desktop control try: import pyautogui # Configure pyautogui for safety pyautogui.FAILSAFE = True pyautogui.PAUSE = 0.1 PYAUTOGUI_AVAILABLE = True except ImportError: logger.error("pyautogui not available - desktop control will be limited") PYAUTOGUI_AVAILABLE = False except Exception as e: # Handle headless environments and other display-related errors logger.warning(f"pyautogui import failed (likely headless environment): {e}") PYAUTOGUI_AVAILABLE = False class DesktopControlMCPServer: """MCP Server providing desktop control capabilities.""" def __init__(self): self.tools = self._define_tools() def _define_tools(self) -> List[Dict[str, Any]]: """Define the available tools for this MCP server.""" return [ { "name": "take_screenshot", "description": "Capture a screenshot and save it to the specified path", "inputSchema": { "type": "object", "properties": { "path": { "type": "string", "description": "File path to save the screenshot" } }, "required": ["path"] } }, { "name": "get_screen_size", "description": "Get the current screen dimensions", "inputSchema": { "type": "object", "properties": {} } }, { "name": "get_mouse_position", "description": "Get the current mouse cursor position", "inputSchema": { "type": "object", "properties": {} } }, { "name": "pixel_color", "description": "Get the RGB color of a pixel at the specified coordinates", "inputSchema": { "type": "object", "properties": { "x": {"type": "integer", "description": "X coordinate"}, "y": {"type": "integer", "description": "Y coordinate"} }, "required": ["x", "y"] } }, { "name": "click", "description": "Perform a left mouse click at the specified coordinates", "inputSchema": { "type": "object", "properties": { "x": {"type": "integer", "description": "X coordinate"}, "y": {"type": "integer", "description": "Y coordinate"} }, "required": ["x", "y"] } }, { "name": "right_click", "description": "Perform a right mouse click at the specified coordinates", "inputSchema": { "type": "object", "properties": { "x": {"type": "integer", "description": "X coordinate"}, "y": {"type": "integer", "description": "Y coordinate"} }, "required": ["x", "y"] } }, { "name": "move_to", "description": "Move the mouse cursor to the specified coordinates", "inputSchema": { "type": "object", "properties": { "x": {"type": "integer", "description": "X coordinate"}, "y": {"type": "integer", "description": "Y coordinate"} }, "required": ["x", "y"] } }, { "name": "drag_to", "description": "Drag the mouse to the specified coordinates with optional duration", "inputSchema": { "type": "object", "properties": { "x": {"type": "integer", "description": "X coordinate"}, "y": {"type": "integer", "description": "Y coordinate"}, "duration": {"type": "number", "description": "Duration of drag in seconds", "default": 0.5} }, "required": ["x", "y"] } }, { "name": "type_text", "description": "Type the specified text string", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Text to type"} }, "required": ["text"] } }, { "name": "press_key", "description": "Press a single key", "inputSchema": { "type": "object", "properties": { "key": {"type": "string", "description": "Key to press (e.g., 'enter', 'space', 'a', 'f1')"} }, "required": ["key"] } }, { "name": "hotkey", "description": "Press a key combination (space-separated keys)", "inputSchema": { "type": "object", "properties": { "keys": {"type": "string", "description": "Space-separated keys (e.g., 'ctrl alt t')"} }, "required": ["keys"] } }, { "name": "scroll", "description": "Scroll the mouse wheel", "inputSchema": { "type": "object", "properties": { "amount": {"type": "integer", "description": "Amount to scroll (positive for up, negative for down)"} }, "required": ["amount"] } }, { "name": "get_os", "description": "Get information about the operating system", "inputSchema": { "type": "object", "properties": {} } } ] def handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle the initialize request.""" logger.info("Received initialize request") return { "protocolVersion": "2024-11-05", "serverInfo": { "name": "desktop-control-mcp", "version": "1.0.0" }, "capabilities": { "tools": {} } } def handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle the tools/list request.""" return {"tools": self.tools} def handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle the tools/call request.""" tool_name = params.get("name", "") arguments = params.get("arguments", {}) logger.info(f"Tool call: {tool_name} with args: {arguments}") if not PYAUTOGUI_AVAILABLE and tool_name != "get_os": return { "content": [ { "type": "text", "text": json.dumps({"error": "pyautogui not available"}) } ], "isError": True } try: result = self._execute_tool(tool_name, arguments) return { "content": [ { "type": "text", "text": json.dumps(result) } ], "isError": False } except Exception as e: logger.error(f"Error executing tool {tool_name}: {e}") return { "content": [ { "type": "text", "text": json.dumps({"error": str(e)}) } ], "isError": True } def _execute_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]: """Execute the specified tool with the given arguments.""" if name == "take_screenshot": path = args.get("path", "screenshot.png") screenshot = pyautogui.screenshot() screenshot.save(path) return {"success": True, "path": path} elif name == "get_screen_size": width, height = pyautogui.size() return {"width": width, "height": height} elif name == "get_mouse_position": x, y = pyautogui.position() return {"x": x, "y": y} elif name == "pixel_color": x = args.get("x", 0) y = args.get("y", 0) color = pyautogui.pixel(x, y) return {"r": color[0], "g": color[1], "b": color[2], "rgb": list(color)} elif name == "click": x = args.get("x") y = args.get("y") pyautogui.click(x, y) return {"success": True, "x": x, "y": y} elif name == "right_click": x = args.get("x") y = args.get("y") pyautogui.rightClick(x, y) return {"success": True, "x": x, "y": y} elif name == "move_to": x = args.get("x") y = args.get("y") pyautogui.moveTo(x, y) return {"success": True, "x": x, "y": y} elif name == "drag_to": x = args.get("x") y = args.get("y") duration = args.get("duration", 0.5) pyautogui.dragTo(x, y, duration=duration) return {"success": True, "x": x, "y": y, "duration": duration} elif name == "type_text": text = args.get("text", "") pyautogui.typewrite(text) return {"success": True, "text": text} elif name == "press_key": key = args.get("key", "") pyautogui.press(key) return {"success": True, "key": key} elif name == "hotkey": keys_str = args.get("keys", "") keys = keys_str.split() pyautogui.hotkey(*keys) return {"success": True, "keys": keys} elif name == "scroll": amount = args.get("amount", 0) pyautogui.scroll(amount) return {"success": True, "amount": amount} elif name == "get_os": import platform return { "system": platform.system(), "release": platform.release(), "version": platform.version(), "machine": platform.machine(), "processor": platform.processor(), "platform": platform.platform() } else: raise ValueError(f"Unknown tool: {name}") def process_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Process an MCP request and return the response.""" method = request.get("method", "") params = request.get("params", {}) req_id = request.get("id") if method == "initialize": result = self.handle_initialize(params) elif method == "tools/list": result = self.handle_tools_list(params) elif method == "tools/call": result = self.handle_tools_call(params) else: # Unknown method return { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } return { "jsonrpc": "2.0", "id": req_id, "result": result } def main(): """Main entry point for the MCP server.""" logger.info("Desktop Control MCP Server starting...") server = DesktopControlMCPServer() # Check if running in a TTY (for testing) if sys.stdin.isatty(): logger.info("Running in interactive mode (for testing)") print("Desktop Control MCP Server", file=sys.stderr) print("Enter JSON-RPC requests (one per line):", file=sys.stderr) try: while True: # Read line from stdin line = sys.stdin.readline() if not line: break line = line.strip() if not line: continue try: request = json.loads(line) response = server.process_request(request) if response: print(json.dumps(response), flush=True) except json.JSONDecodeError as e: logger.error(f"Invalid JSON: {e}") error_response = { "jsonrpc": "2.0", "id": None, "error": { "code": -32700, "message": "Parse error" } } print(json.dumps(error_response), flush=True) except KeyboardInterrupt: logger.info("Received keyboard interrupt, shutting down...") except Exception as e: logger.error(f"Unexpected error: {e}") logger.info("Desktop Control MCP Server stopped.") if __name__ == "__main__": main()