All Products
Search
Document Center

E-MapReduce:Sample code

Last Updated:Sep 21, 2023

This topic describes how to use E-MapReduce (EMR) SDK for Python to perform common operations. For example, you can create clusters, query clusters, and scale in or scale out node groups.

Note
  • Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configuration methods.

  • This topic provides sample code for Python 2. If you use Python 3, you can use the sample code in Common API instructions.

Create a cluster

#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import CreateClusterRequest
clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou') # set accessId and accessKey
request = CreateClusterRequest.CreateClusterRequest()
request.set_Name("pydemo")
request.set_ZoneId("cn-hangzhou-b")
request.set_LogEnable(False)
request.set_SecurityGroupId("sg-********")
request.set_IsOpenPublicIp(True)
request.set_ChargeType("PostPaid")
request.set_EmrVer("EMR-1.3.0")
request.set_ClusterType("HADOOP")
request.set_IoOptimized(True)
request.set_InstanceGeneration("ecs-2")
# set EcsOrder
request.add_query_param('EcsOrder.1.NodeCount', '1')
request.add_query_param('EcsOrder.1.NodeType', 'MASTER')
request.add_query_param('EcsOrder.1.InstanceType', 'ecs.n1.large')
request.add_query_param('EcsOrder.1.DiskType', 'CLOUD_EFFICIENCY')
request.add_query_param('EcsOrder.1.DiskCapacity', '80')
request.add_query_param('EcsOrder.1.DiskCount', '1')
request.add_query_param('EcsOrder.1.Index', '1')
request.add_query_param('EcsOrder.2.NodeCount', '3')
request.add_query_param('EcsOrder.2.NodeType', 'CORE')
request.add_query_param('EcsOrder.2.InstanceType', 'ecs.n1.large')
request.add_query_param('EcsOrder.2.DiskType', 'CLOUD_EFFICIENCY')
request.add_query_param('EcsOrder.2.DiskCapacity', '80')
request.add_query_param('EcsOrder.2.DiskCount', '4')
request.add_query_param('EcsOrder.2.Index', '2')
request.set_accept_format('json')
result = clt.do_action(request)
print result
Important

Alibaba Cloud services are numerous and SDKs of the Alibaba Cloud services are automatically generated. As a result, SDKs of specific Alibaba Cloud services are inconvenient to use. In most scenarios, the SDK for Python does not support input parameters of the LIST type. If you use input parameters of the LIST type, special data processing is required.

  • If you set an input parameter to a list that contains values of basic data types, you can view the configurations of the StatusList parameter in the sample code in the Query clusters section.

  • If you set an input parameter to a list that contains values of complex data types, you can view the configurations of the EcsOrder parameter in the sample code in the Create a cluster section.

  • In other scenarios, we recommend that you use the SDK for Java to process input parameters of the LIST type, such as the BootstrapAction parameter, because SDKs for Java are easier to use.

The methods that are used to call other API operations are similar to the methods that are used to call the preceding operation. For more information, see List of operations by function.

Query clusters

#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import ListClustersRequest
clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou') # set accessId and accessKey
request = ListClustersRequest.ListClustersRequest()
request.set_accept_format('xml') # xml or json
# Specify filter conditions to obtain the clusters that are in the RUNNING state and the clusters that are in the IDLE state. The StatusList parameter is optional. 
request.add_query_param('StatusList.1', 'RUNNING')
request.add_query_param('StatusList.2', 'IDLE')
result = clt.do_action(request)
print result

Scale out a node group

You can increase the number of nodes in the node group to scale out a node group.

#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import ResizeClusterV2Request

clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou')
request = ResizeClusterV2Request.ResizeClusterV2Request()
request.set_accept_format('json')

# Specify the ID of the cluster.
request.set_ClusterId("C-01A1F4A********")
# Specify information about the node group that you want to scale out. You can scale out multiple node groups at the same time. The node groups are counted from 1. 
request.add_query_param('HostGroup.1.HostGroupId', 'G-F0D0661E0A6E****')
request.add_query_param('HostGroup.1.NodeCount', 1)
#request.add_query_param('HostGroup.2.HostGroupId', 'G-F0D0661E0A6****')
#request.add_query_param('HostGroup.2.NodeCount', 3)

result = clt.do_action(request)
print(result)

Scale in a node group

You can decrease the number of nodes in a node group to scale in the node group. You can also specify the IDs of nodes that you want to remove to scale in a node group.

Reduce the number of nodes in a node group to scale in the node group

Important

To scale in a node group by using this method, run the following command to upgrade the version of EMR SDK:

sudo pip install aliyun-python-sdk-emr --upgrade
#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import ReleaseClusterHostGroupRequest

clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou')
request = ReleaseClusterHostGroupRequest.ReleaseClusterHostGroupRequest()
request.set_accept_format('json')

# Specify the ID of the cluster. 
request.set_ClusterId("C-01A1F4A********")
# Specify the ID of the node group. You can call the ListClusterHostGroup operation to obtain the ID of the node group. 
request.set_HostGroupId("G-D11D3E*******")

# Specify the number of nodes that you want to remove. 
request.set_ReleaseNumber(1)
# Enable YARN decommissioning. You can enable YARN decommissioning only for the task node group of a Hadoop cluster. 
request.set_EnableGracefulDecommission(True)
request.set_DecommissionTimeout(60)

result = clt.do_action(request)
print(result)

If the error message shown in the following figure is returned after you run the preceding code, change the value of the yarn.resourcemanager.nodes.exclude-path parameter to /etc/ecm/hadoop-conf/yarn-exclude.xml by using one of the following methods:异常信息

  • Method 1: In the EMR console, search for the yarn.resourcemanager.nodes.exclude-path parameter on the Configure tab of the YARN service and change the value to /etc/ecm/hadoop-conf/yarn-exclude.xml. Then, save and deploy the configuration on the Configure tab.

  • Method 2: Modify the following code to change the value of the yarn.resourcemanager.nodes.exclude-path parameter:

    request = ModifyClusterServiceConfigRequest.ModifyClusterServiceConfigRequest()
    request.set_accept_format('json') # The returned result is in JSON format. 
    request.set_ClusterId("C-01A1F4A********")
    request.set_ServiceName("YARN")
    request.set_ConfigParams('{"yarn-site":{"yarn.resourcemanager.nodes.exclude-path":"/etc/ecm/hadoop-conf/yarn-exclude.xml"}}')
    request.set_Comment('for decommission gracefully')
    request.set_RefreshHostConfig(True)
    result = clt.do_action(request)
    print(result)

Specify the IDs of the nodes that you want to remove to scale in a node group

#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import ReleaseClusterHostGroupRequest

clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou')
request = ReleaseClusterHostGroupRequest.ReleaseClusterHostGroupRequest()
request.set_accept_format('json')

# Specify the ID of the cluster. 
request.set_ClusterId("C-01A1F4A********")
# Specify the ID of the node group. You can call the ListClusterHostGroup operation to obtain the ID of the node group. 
request.set_HostGroupId("G-D11D3E*******")

# Specify the IDs of nodes that you want to remove. If you configure both the InstanceIdList parameter and the ReleaseNumber parameter, the configuration of the InstanceIdList parameter prevails. 
request.set_InstanceIdList(["i-1**", "i-2**"])

# Enable YARN decommissioning. You can enable YARN decommissioning only for the task node group of a Hadoop cluster. 
request.set_EnableGracefulDecommission(True)
# Specify the timeout period for graceful decommissioning of nodes. Unit: seconds. If you do not configure this parameter, the default timeout period 3600s is used. 
request.set_DecommissionTimeout(60)
result = clt.do_action(request)
print(result)