全部产品
Search
文档中心

开源大数据平台E-MapReduce:样例代码

更新时间:Sep 21, 2023

本文介绍如何快速使用EMR Python SDK完成常见操作,例如创建集群、查询集群列表和扩缩容节点组等。

说明
  • 请确保在代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体配置方法,请参见配置方案

  • 本文为您介绍Python 2的样例代码。如果您使用的是Python 3,则请参见常用API教程下的示例。

创建集群

#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import CreateClusterRequest
clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou') # set accessId and accessKey
request = CreateClusterRequest.CreateClusterRequest()
request.set_Name("pydemo")
request.set_ZoneId("cn-hangzhou-b")
request.set_LogEnable(False)
request.set_SecurityGroupId("sg-********")
request.set_IsOpenPublicIp(True)
request.set_ChargeType("PostPaid")
request.set_EmrVer("EMR-1.3.0")
request.set_ClusterType("HADOOP")
request.set_IoOptimized(True)
request.set_InstanceGeneration("ecs-2")
# set EcsOrder
request.add_query_param('EcsOrder.1.NodeCount', '1')
request.add_query_param('EcsOrder.1.NodeType', 'MASTER')
request.add_query_param('EcsOrder.1.InstanceType', 'ecs.n1.large')
request.add_query_param('EcsOrder.1.DiskType', 'CLOUD_EFFICIENCY')
request.add_query_param('EcsOrder.1.DiskCapacity', '80')
request.add_query_param('EcsOrder.1.DiskCount', '1')
request.add_query_param('EcsOrder.1.Index', '1')
request.add_query_param('EcsOrder.2.NodeCount', '3')
request.add_query_param('EcsOrder.2.NodeType', 'CORE')
request.add_query_param('EcsOrder.2.InstanceType', 'ecs.n1.large')
request.add_query_param('EcsOrder.2.DiskType', 'CLOUD_EFFICIENCY')
request.add_query_param('EcsOrder.2.DiskCapacity', '80')
request.add_query_param('EcsOrder.2.DiskCount', '4')
request.add_query_param('EcsOrder.2.Index', '2')
request.set_accept_format('json')
result = clt.do_action(request)
print result
重要

由于阿里云产品众多,所有SDK的生成是通过程序自动生成的,所以部分SDK的使用比较不方便。在目前Python SDK中,对入参是List(不管是基本类型的List还是复杂对象的List)支持不好,需要特殊处理。

  • 如果是基本类型的List入参,请参见查看集群列表示例代码中对StatusList参数的设置。

  • 如果是复杂对象的List入参,请参见创建集群示例代码中对EcsOrder参数的设置。

  • 其他List入参,例如BootstrapAction参数,推荐您使用Java SDK,使用Java SDK会更加方便。

其他接口操作类似,具体请参见API概览

查看集群列表

#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import ListClustersRequest
clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou') # set accessId and accessKey
request = ListClustersRequest.ListClustersRequest()
request.set_accept_format('xml') # xml or json
# 设置状态过滤,只查找RUNNING和IDLE的集群,注意该参数为可选参数,可以不设置。
request.add_query_param('StatusList.1', 'RUNNING')
request.add_query_param('StatusList.2', 'IDLE')
result = clt.do_action(request)
print result

扩容节点组

通过调整指定节点组数量扩容节点组。

#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import ResizeClusterV2Request

clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou')
request = ResizeClusterV2Request.ResizeClusterV2Request()
request.set_accept_format('json')

# 集群ID
request.set_ClusterId("C-01A1F4A********")
# 待扩容节点组信息,支持同时对多个节点组扩容从1开始计数。
request.add_query_param('HostGroup.1.HostGroupId', 'G-F0D0661E0A6E****')
request.add_query_param('HostGroup.1.NodeCount', 1)
#request.add_query_param('HostGroup.2.HostGroupId', 'G-F0D0661E0A6****')
#request.add_query_param('HostGroup.2.NodeCount', 3)

result = clt.do_action(request)
print(result)

缩容节点组

您可以选择通过调整指定节点组数量缩容节点组或通过实例ID缩容节点组。

通过调整指定节点组数量缩容节点组

重要

使用该特性,您需要通过以下命令升级您的SDK版本。

sudo pip install aliyun-python-sdk-emr --upgrade
#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import ReleaseClusterHostGroupRequest

clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou')
request = ReleaseClusterHostGroupRequest.ReleaseClusterHostGroupRequest()
request.set_accept_format('json')

# 集群ID。
request.set_ClusterId("C-01A1F4A********")
# 节点组ID,可通过ListClusterHostGroup接口获取节点组ID。
request.set_HostGroupId("G-D11D3E*******")

# 缩容数量。
request.set_ReleaseNumber(1)
# 可开启YARN Decommission,目前只允许对HADOOP集群的Task组开启。
request.set_EnableGracefulDecommission(True)
request.set_DecommissionTimeout(60)

result = clt.do_action(request)
print(result)

如果执行上面代码返回如下异常信息,则需要修改相关的配置信息。异常信息

  • 控制台方式:您可以在EMR控制台YARN服务的配置页面,搜索参数yarn.resourcemanager.nodes.exclude-path,修改参数值为/etc/ecm/hadoop-conf/yarn-exclude.xml。然后配置并部署配置,使配置生效。

  • 代码方式:您也可以通过以下代码完成修改。

    request = ModifyClusterServiceConfigRequest.ModifyClusterServiceConfigRequest()
    request.set_accept_format('json') # 返回结果格式为JSON。
    request.set_ClusterId("C-01A1F4A********")
    request.set_ServiceName("YARN")
    request.set_ConfigParams('{"yarn-site":{"yarn.resourcemanager.nodes.exclude-path":"/etc/ecm/hadoop-conf/yarn-exclude.xml"}}')
    request.set_Comment('for decommission gracefully')
    request.set_RefreshHostConfig(True)
    result = clt.do_action(request)
    print(result)

通过实例ID缩容节点组

#!/usr/bin/python
from aliyunsdkcore import client
from aliyunsdkemr.request.v20160408 import ReleaseClusterHostGroupRequest

clt = client.AcsClient(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),'cn-hangzhou')
request = ReleaseClusterHostGroupRequest.ReleaseClusterHostGroupRequest()
request.set_accept_format('json')

# 集群ID。
request.set_ClusterId("C-01A1F4A********")
# 节点组ID,可通过ListClusterHostGroup接口获取节点组ID。
request.set_HostGroupId("G-D11D3E*******")

# 可以指定释放某些ECS,其与ReleaseNumber同时设置时以InstanceIdList为准。
request.set_InstanceIdList(["i-1**", "i-2**"])

# 可开启YARN Decommission,目前只允许对HADOOP集群的Task组开启。
request.set_EnableGracefulDecommission(True)
# 优雅下线超时时长,单位为秒。不填写时默认超时时间为3600s。
request.set_DecommissionTimeout(60)
result = clt.do_action(request)
print(result)