目前暂不支持通过EMR OpenAPI按照时间和负载弹性伸缩。但是,您可以添加触发逻辑,调用EMR OpenAPI实现对现有集群的扩容。

前提条件

使用场景

您已经在EMR中创建了一个Hadoop集群,包括Master、Core和Task类型的节点。希望通过OpenAPI实现对Task类型节点的添加。

现有集群和扩容节点的基本配置为:
  • 集群名为emr_openapi_demo, 集群ID为C-69CB0546800F****。
  • 需要扩容的是Task机器组,新增4个节点,每个节点为ecs.c5.xlarge,系统盘为120 GB*1的ESSD,数据盘为80 GB*4的高效云盘。
    扩容程序执行成功后,您可通过如下步骤在控制台查看新增的节点。
    1. 登录阿里云E-MapReduce控制台
    2. 单击上方的集群管理页签。
    3. 集群管理页面的集群列表中,单击对应集群所在行的详情
    4. 单击左侧导航栏的主机列表

      主机列表页面,可查看新增的节点。

  • 集群的Task机器组的GroupId为G-C73605CF4382****。
    说明 在集群扩容时,需要知道当前集群中待扩容机器组的GroupId。此参数需要您通过OpenAPI从集群信息中解析获取。

示例

Java和Python示例如下:
  • Java
    1. 根据集群所在Region和集群ID,获取该集群的Task机器组的GroupId。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class DescribeClusterV2 {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              DescribeClusterV2Request request = new DescribeClusterV2Request();
              request.setRegionId("cn-hangzhou");
              request.setId("C-69CB0546800F****");
      
              try {
                  DescribeClusterV2Response response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }

      在得到的返回结果中找到Task机器组的GroupId。

      返回结果以JSON为例,GroupId的查找路径如下。
      ClusterInfo -> HostGroupList -> HostGroup -> HostGroupType=TASK -> HostGroupId
    2. 扩容代码。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class ResizeClusterV2 {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              ResizeClusterV2Request request = new ResizeClusterV2Request();
              request.setRegionId("cn-hangzhou");
              request.setClusterId("C-69CB0546800F****");
      
              List<ResizeClusterV2Request.HostGroup> hostGroupList = new ArrayList<ResizeClusterV2Request.HostGroup>();
      
              ResizeClusterV2Request.HostGroup hostGroup1 = new ResizeClusterV2Request.HostGroup();
              hostGroup1.setClusterId("C-69CB0546800F****");
              hostGroup1.setHostGroupId("G-C73605CF4382****");
              hostGroup1.setHostGroupName("task_group");
              hostGroup1.setHostGroupType("TASK");
              hostGroup1.setNodeCount(4);
              hostGroup1.setInstanceType("ecs.c5.xlarge");
              hostGroupList.add(hostGroup1);
              request.setHostGroups(hostGroupList);
      
              try {
                  ResizeClusterV2Response response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }
  • Python
    1. 根据集群所在Region和集群ID,获取该集群的Task机器组的GroupId。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.DescribeClusterV2Request import DescribeClusterV2Request
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = DescribeClusterV2Request()
      request.set_accept_format('json')
      
      request.set_Id("C-69CB0546800F****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))

      在得到的返回结果中找到Task机器组的GroupId。

      返回结果以JSON为例,GroupId的查找路径如下。
      ClusterInfo -> HostGroupList -> HostGroup -> HostGroupType=TASK -> HostGroupId
    2. 扩容代码。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.ResizeClusterV2Request import ResizeClusterV2Request
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = ResizeClusterV2Request()
      request.set_accept_format('json')
      
      request.set_ClusterId("C-69CB0546800F****")
      request.set_HostGroups([
        {
          "ClusterId": "C-69CB0546800F****",
          "HostGroupId": "G-C73605CF4382****",
          "HostGroupName": "task_group",
          "HostGroupType": "TASK",
          "NodeCount": 4,
          "InstanceType": "ecs.c5.xlarge"
        }
      ])
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))

相关OpenAPI