All Products
Search
Document Center

E-MapReduce:Sample code

Last Updated:Sep 21, 2023

This topic describes how to use E-MapReduce (EMR) SDK for Java to perform common operations. For example, you can create clusters, create jobs, and scale in or scale out node groups.

Prerequisites

Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configuration methods.

Create a cluster

public static void main(String[] args) {
      IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
      DefaultAcsClient client = new DefaultAcsClient(profile);
      final CreateClusterRequest request = new CreateClusterRequest();
      request.setName("Your-Cluster-Name");
      request.setSecurityGroupId("Your-Security-Group-Id"); // If no security group ID is specified, a new security group with a given name is created. 
      request.setAutoRenew(false);
      request.setChargeType("PostPaid"); // Specify the billing method. In this example, the pay-as-you-go billing method is used. 
      request.setClusterType("HADOOP"); // Specify the cluster type. 
      request.setEmrVer("EMR-1.3.0"); // Specify the EMR version. 
      request.setIsOpenPublicIp(true);
      request.setLogEnable(true);
      request.setLogPath("oss://Your-Bucket/Your-Folder");
      request.setMasterPwdEnable(true); // Enable password logon for the master node. 
      request.setMasterPwd("Aa123456789"); // Specify the password for the master node. 
      request.setZoneId("cn-hangzhou-b"); // Specify the zone ID. 
       // The availability of hardware configurations, such as Elastic Compute Service (ECS) instance types and cloud disk types, is determined based on the specified I/O optimization parameter, ECS instance series, and network type. For more information, visit the ECS buy page. 
      request.setIoOptimized(true); // Set this parameter to true to enable I/O optimization. 
      request.setInstanceGeneration("ecs-2"); // Set this parameter to ecs-2. Valid values: ecs-1 and ecs-2. 
      request.setNetType("vpc"); // Specify the network type. 
      request.setVpcId("your-vpcId");
      request.setVSwitchId("your-switchId");
      List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
      CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
      masterOrder.setIndex(1);
      masterOrder.setDiskCapacity(50);
      masterOrder.setDiskCount(2);
      masterOrder.setDiskType("CLOUD_EFFICIENCY"); // Specify the disk type. 
      masterOrder.setInstanceType("ecs.n1.large"); // Specify the ECS instance type. 
      masterOrder.setNodeCount(1);
      masterOrder.setNodeType("MASTER"); // Specify the node type. In this example, the master node is used. 
      ecsOrders.add(masterOrder);
      CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
      coreOrder.setIndex(2);
      coreOrder.setDiskCapacity(50);
      coreOrder.setDiskCount(4);
      coreOrder.setDiskType("CLOUD_EFFICIENCY");
      coreOrder.setInstanceType("ecs.n1.large");
      coreOrder.setNodeCount(3);
      coreOrder.setNodeType("CORE");
      ecsOrders.add(coreOrder);
      request.setEcsOrders(ecsOrders);
      try {
          CreateClusterResponse response = client.getAcsResponse(request);
          String clusterId = response.getClusterId(); // cluster id
          // Perform operations on the cluster. 
      } catch (Exception e) {

      }
  }
  • When you create a cluster, specify a security group for the cluster. If you do not specify the ID of a security group, you must specify a security group name. This way, a security group is automatically created during cluster creation.

  • Specify whether to enable high availability. For more information, see hardware configurations in Create a cluster.

    request.setHighAvailabilityEnable(true);
  • Configure the settings of optional software components. For more information, see software configurations in Create a cluster.

    List<String> soft = new ArrayList<String>();
    soft.add("presto");
    soft.add("oozie");
    request.setOptionSoftWareLists(soft);
  • Specify other configuration items. For more information, see Customize software configurations.

    request.setConfigurations("oss://your-bucket/your-conf.json");
  • Configure bootstrap actions. For more information, see Bootstrap actions.

    List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>();
    CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction();
    bootstrapActionList.setName("bootstrapName");
    bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py");
    bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc");
    bootstrapActionLists.add(bootstrapActionList);
    request.setBootstrapActions(bootstrapActionLists);

Query the details of a cluster

public static void main(String[] args) {
      IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
      DefaultAcsClient client = new DefaultAcsClient(profile);
      final DescribeClusterRequest request = new DescribeClusterRequest();
      request.setId("C-XXXXXXXXXXXXXXXX"); // Specify the ID of the cluster. 
      try {
          DescribeClusterResponse response = client.getAcsResponse(request);
          DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
          // Perform operations on the cluster. 
      } catch (Exception e) {

      }
  }

Query the list of clusters

public static void main(String[] args) {
          IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
          DefaultAcsClient client = new DefaultAcsClient(profile);
          final ListClustersRequest request = new ListClustersRequest();
          request.setPageNumber(1);
          request.setIsDesc(true);
          request.setPageSize(20);
          try {
              ListClustersResponse response = client.getAcsResponse(request);
              List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
              for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
                  // Perform operations on the cluster. 
              }
          } catch (Exception e) {

          }
      }

Release a cluster

public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            DefaultAcsClient client = new DefaultAcsClient(profile);
            ReleaseClusterRequest request = new ReleaseClusterRequest();
            request.setId("C-XXXXXXXXXXXXXXXX"); // Specify the ID of the cluster. 
            try {
                ReleaseClusterResponse response = client.getAcsResponse(request);
            } catch (Exception e) {

            }
        }

Create a job

public static void main(String[] args) {
      IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
      DefaultAcsClient client = new DefaultAcsClient(profile);
      final CreateJobRequest request = new CreateJobRequest();
      request.setName("Your-Job-Name");
      request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
      request.setFailAct("CONTINUE"); // If the job fails, skip the job and proceed to subsequent operations. 
      request.setType("SPARK"); // Specify the job type. 
      try {
            CreateJobResponse response = client.getAcsResponse(request);
            String jobId = response.getId();
        } catch (Exception e) {

        }
    }

Delete a job

Important

If you want to delete a job that is used for a workflow, you must delete or modify the workflow first.

public static void main(String[] args) {
          IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
          DefaultAcsClient client = new DefaultAcsClient(profile);
          final DeleteJobRequest request = new DeleteJobRequest();
          request.setId("J-XXXXXXXXXXXXXXXX"); // Specify the ID of the job. 
          try {
              DeleteJobResponse response = client.getAcsResponse(request);
          } catch (Exception e) {

          }
      }

Scale out a node group

You can increase the number of nodes in a node group to scale out the node group.

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);

ResizeClusterV2Request request = new ResizeClusterV2Request();
request.setClusterId("C-0E4B90219*****");
List<ResizeClusterV2Request.HostGroup> hostGroups = new ArrayList<>();
ResizeClusterV2Request.HostGroup hostGroup = new ResizeClusterV2Request.HostGroup();
hostGroups.add(hostGroup);
hostGroup.setHostGroupId("G-F0D0661E0A6E****");
// Specify the number of nodes that you want to add to the node group. 
hostGroup.setNodeCount(1);
request.setHostGroups(hostGroups);

System.out.println(JSON.toJSONString(client.getAcsResponse(request)));

Scale in a node group

You can decrease the number of nodes in a node group to scale in the node group. You can also specify the IDs of nodes that you want to remove to scale in a node group.

Decrease the number of nodes in a node group to scale in the node group

Important

Before you scale in a node group by using this method, update the version of EMR SDK for Java to 3.3.8.

<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>aliyun-java-sdk-emr</artifactId>
  <version>3.3.8</version>
</dependency>
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);

ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# Specify the ID of the cluster. 
request.setClusterId("C-01A1F4A********");
# Specify the ID of the node group. You can call the ListClusterHostGroup operation to obtain the ID of the node group. 
request.setHostGroupId("G-D11D3E*******");

// Specify the number of nodes that you want to remove. 
request.setReleaseNumber(3);
request.setEnableGracefulDecommission(true);
// Unit: seconds. 
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));

If YARN decommissioning is enabled, change the value of the yarn.resourcemanager.nodes.exclude-path parameter to /etc/ecm/hadoop-conf/yarn-exclude.xml on the Configure tab of the YARN service in the EMR console. Then, save and deploy the configuration on the Configure tab. You can also modify the following code to change the value of the parameter:

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);

ModifyClusterServiceConfigRequest modifyClusterServiceConfigRequest = new ModifyClusterServiceConfigRequest();
modifyClusterServiceConfigRequest.setClusterId("C-01A1F4A********");
modifyClusterServiceConfigRequest.setRegionId("cn-hangzhou");
modifyClusterServiceConfigRequest.setServiceName("YARN");
modifyClusterServiceConfigRequest.setConfigParams("{\"yarn-site\":{\"yarn.resourcemanager.nodes.exclude-path\":\"/etc/ecm/hadoop-conf/yarn-exclude.xml\"}}");
modifyClusterServiceConfigRequest.setCustomConfigParams("{}");
modifyClusterServiceConfigRequest.setComment("for decommission gracefully");
modifyClusterServiceConfigRequest.setRefreshHostConfig(Boolean.TRUE);
System.out.println(JSON.toJSONString(client.getAcsResponse(modifyClusterServiceConfigRequest)));

Specify the IDs of the nodes that you want to remove to scale in a node group

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);

ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# Specify the ID of the cluster. 
request.setClusterId("C-C52CF4246D10****");
# Specify the ID of the node group. You can call the ListClusterHostGroup operation to obtain the ID of the node group. 
request.setHostGroupId("G-A24651D939AD****");

// Specify the IDs of the nodes that you want to remove. 
List<String> instanceIds = new ArrayList<>();
instanceIds.add("i-xxxxxxx");
instanceIds.add("i-xxxxxxy");
 
request.setInstanceIdList(JSON.toJSONString(instanceIds));

request.setEnableGracefulDecommission(true);
// Unit: seconds. 
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));