edit-icon download-icon

Quick start

Last Updated: May 04, 2017

Environment preparation

Create a Maven project and add Maven dependency, as shown below:

  1. <dependency>
  2. <groupId>com.aliyun</groupId>
  3. <artifactId>aliyun-java-sdk-core</artifactId>
  4. <version>2.3.9</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun</groupId>
  8. <artifactId>aliyun-java-sdk-emr</artifactId>
  9. <version>2.2.2</version>
  10. </dependency>

Or you can download the corresponding JAR file to a local path directly. Taking Eclipse as an example, the procedure is as follows:

  1. Download the JAR file.

    aliyun-java-sdk-core-2.3.9.jar

    aliyun-java-sdk-emr-2.2.2.jar

  2. Copy the downloaded file to your project.

  3. In Eclipse, right click your project name and click Properties > Java Build Path > Add JARs.

  4. Select all the JAR files that you have copied in Step 2.

When you complete the preceding steps, you can use Alibaba Cloud E-MapReduce Open API Java SDK in Eclipse.

Client initialization

  1. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
  2. DefaultAcsClient client = new DefaultAcsClient(profile);

All operations on the E-MapReduce in SDK can be performed using this client.

Sample code

Cluster

  • Create a cluster

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final CreateClusterRequest request = new CreateClusterRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setName("Your-Cluster-Name");
    7. // if you did not specify security group id, it will create a new security group with given name
    8. request.setSecurityGroupId("Your-Security-Group-Id"); // (1)
    9. request.setAutoRenew(false);
    10. request.setChargeType("PostPaid"); // PostPaid or PrePaid
    11. request.setClusterType("HADOOP"); // HADOOP or HBase (2)
    12. request.setEmrVer("EMR-1.3.0"); // emr image version
    13. request.setIsOpenPublicIp(true);
    14. request.setLogEnable(true);
    15. request.setLogPath("oss://Your-Bucket/Your-Folder");
    16. request.setMasterPwdEnable(true); // enable master password
    17. request.setMasterPwd("Aa123456789"); // set master node's password
    18. request.setZoneId("cn-hangzhou-b"); // set zone
    19. // IO optimization parameters. ECS series and network types decide the available hardware configurations (ECS instance types and cloud disk types).
    20. // For details, you can refer to the available combinations and supported types on the ECS purchase page.
    21. // https://ecs.console.aliyun.com/#/create/postpay/
    22. request.setIoOptimized(true); // Set the IO optimization parameters
    23. request.setInstanceGeneration("ecs-2"); // Set to ECS II series, ECS-1/ECS-2.
    24. request.setNetType("classic"); // Set the network type, classic/vpc.
    25. List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
    26. CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
    27. masterOrder.setIndex(1);
    28. masterOrder.setDiskCapacity(50);
    29. masterOrder.setDiskCount(2);
    30. masterOrder.setDiskType("CLOUD_EFFICIENCY"); // specify disk type (2)
    31. masterOrder.setInstanceType("ecs.n1.large"); // specify ecs instance type
    32. masterOrder.setNodeCount(1);
    33. masterOrder.setNodeType("MASTER"); // MASTER or CORE (2)
    34. ecsOrders.add(masterOrder);
    35. CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
    36. coreOrder.setIndex(2);
    37. coreOrder.setDiskCapacity(50);
    38. coreOrder.setDiskCount(4);
    39. coreOrder.setDiskType("CLOUD_EFFICIENCY");
    40. coreOrder.setInstanceType("ecs.n1.large");
    41. coreOrder.setNodeCount(3);
    42. coreOrder.setNodeType("CORE");
    43. ecsOrders.add(coreOrder);
    44. request.setEcsOrders(ecsOrders);
    45. try {
    46. CreateClusterResponse response = client.getAcsResponse(request);
    47. String clusterId = response.getClusterId(); // cluster id
    48. // TODO do something with this cluster
    49. } catch (Exception e) {
    50. // TODO do something
    51. }
    52. }
    1. To create a cluster, you need to specify the security group that the cluster belongs to. If the security group ID is not specified, you need to specify a security group and create a new security group when creating the cluster.

    2. For specific enumeration values, refer to here.

    3. In the sample code above, a cluster in the classic network is created. If you want to create a cluster in VPC Network, you need to set the network type in the request to VPC and specify the vpcid and vswitchid, as shown below:

      1. request.setNetType("vpc"); // Setting Network Types classic/vpc
      2. request.setVpcId("your-vpcId");
      3. request.setVSwitchId("your-switchId");
    4. Set the high availability parameters. For instructions on high availability parameters, see Create a cluster > Configuration the hardware.

      1. request.setHighAvailabilityEnable(true);
    5. Set the available software components. For instructions on available software components, see Create Cluster > Software configuration.

      1. List<String> soft = new ArrayList<String>();
      2. soft.add("presto");
      3. soft.add("oozie");
      4. request.setOptionSoftWareLists(soft);
    6. To set the configuration item, see here.

      1. request.setConfigurations("oss://your-bucket/your-conf.json");
    7. To set the bootstrap actions, see here.

      1. List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>();
      2. CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction();
      3. bootstrapActionList.setName("bootstrapName");
      4. bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py");
      5. bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc");
      6. bootstrapActionLists.add(bootstrapActionList);
      7. request.setBootstrapActions(bootstrapActionLists);
  • Query the cluster details

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final DescribeClusterRequest request = new DescribeClusterRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setId("C-XXXXXXXXXXXXXXXX"); // cluster id
    7. try {
    8. DescribeClusterResponse response = client.getAcsResponse(request);
    9. DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
    10. // TODO do something with this cluster
    11. } catch (Exception e) {
    12. // TODO do something
    13. }
    14. }
  • Query the cluster list

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final ListClustersRequest request = new ListClustersRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setPageNumber(1);
    7. request.setIsDesc(true);
    8. request.setPageSize(20);
    9. try {
    10. ListClustersResponse response = client.getAcsResponse(request);
    11. List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
    12. for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
    13. // TODO do something with this cluster
    14. }
    15. } catch (Exception e) {
    16. // TODO do something
    17. }
    18. }
  • Release a cluster

  1. public static void main(String[] args) {
  2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
  3. DefaultAcsClient client = new DefaultAcsClient(profile);
  4. ReleaseClusterRequest request = new ReleaseClusterRequest();
  5. request.setId("C-XXXXXXXXXXXXXXXX"); // specify the cluster id you want to release.
  6. try {
  7. ReleaseClusterResponse response = client.getAcsResponse(request);
  8. } catch (Exception e) {
  9. // TODO do something
  10. }
  11. }

Job

  • Create a job

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final CreateJobRequest request = new CreateJobRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setName("Your-Job-Name");
    7. request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
    8. request.setFailAct("CONTINUE"); // STOP or CONTINUE
    9. request.setType("SPARK"); // SPARK or HADOOP or HIVE or PIG
    10. try {
    11. CreateJobResponse response = client.getAcsResponse(request);
    12. String jobId = response.getId();
    13. // TODO do something with this job
    14. } catch (Exception e) {
    15. // TODO do something
    16. }
    17. }
  • Delete a job

    Note: If a job is used by another execution plan, it cannot be deleted. You need to first delete or modify the corresponding execution plan.

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final DeleteJobRequest request = new DeleteJobRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setId("J-XXXXXXXXXXXXXXXX"); // set job id
    7. try {
    8. DeleteJobResponse response = client.getAcsResponse(request);
    9. } catch (Exception e) {
    10. // TODO do something
    11. }
    12. }

Execution plan

  • Create an execution plan

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final CreateExecutionPlanRequest request = new CreateExecutionPlanRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setName("Your-ExecutionPlan-Name");
    7. request.setCreateClusterOnDemand(false);
    8. request.setStrategy("RUN_MANUALLY"); // RUN_MANUALLY or SCHEDULE
    9. request.setClusterId("C-XXXXXXXXXXXXXXXX"); // specify an existing running cluster
    10. List<String> jobIds = new ArrayList<String>();
    11. jobIds.add("J-XXXXXXXXXXXXXXXX"); // specify a job
    12. request.setJobIdLists(jobIds);
    13. try {
    14. CreateExecutionPlanResponse response = client.getAcsResponse(request);
    15. String executionPlanId = response.getId();
    16. // TODO do something with this execution plan
    17. } catch (Exception e) {
    18. // TODO do something
    19. }
    20. }

    The above code creates an execution plan for manual execution (non-periodic scheduling) and the execution plan is associated with a created cluster.

    If you want to create an execution plan for periodic scheduling, you need to modify or add the following code:

    1. request.setStrategy("SCHEDULE"); // RUN_MANUALLY or SCHEDULE
    2. request.setStartTime(new Date().getTime()); // set start time
    3. request.setTimeUnit("DAY"); // DAY or HOUR
    4. request.setTimeInterval(1); // set time interval

    If you want to create an execution plan for on-demand cluster creation, you need to modify or add the following code:

    1. request.setCreateClusterOnDemand(true);
    2. request.setClusterType("HADOOP");
    3. request.setClusterName("Your-Cluster-Name");
    4. request.setEmrVer("EMR-1.3.0");
    5. request.setSecurityGroupId("Your-Security-Group-Id");
    6. request.setIsOpenPublicIp(true);
    7. // IO optimization parameters. ECS series and network types decide the available hardware configurations[A1] (ECS instance types and cloud disk types).
    8. // For details, you can refer to the available combinations and supported types on the ECS purchase page.
    9. // https://ecs.console.aliyun.com/#/create/postpay/
    10. request.setIoOptimized(true); // Set the IO optimization parameters
    11. request.setInstanceGeneration("ecs-2"); // Set to ECS II series, ECS-1/ECS-2.
    12. request.setNetType("classic"); // Set the network type, classic/vpc.
    13. request.setLogEnable(true);
    14. request.setLogPath("oss://xxx");
    15. request.setEcsOrders(); // For TODO, refer to the parameter settings of cluster creation. Note: here the ecsOder type is CreateExecutionPlanRequest.EcsOrder, which is different from the CreateClusterRequest.EcsOrder for cluster creation.

    Through the parameters above, we can specify the configuration of a cluster. For specific parameter settings, you can refer to the logic of cluster creation. The execution plan for on-demand cluster creation will create a temporary cluster to run the execution plan every time the execution plan starts to run according to the cluster configuration and the cluster will be automatically released as the execution plan is completed. Unlike the cluster creation logic, the cluster created here must have a security group ID specified (that is, you cannot create a security group by specifying the security group name).

    Of course, the periodic scheduling and on-demand cluster creation do not conflict with each other. You can create an execution plan for periodic scheduling and on-demand cluster creation.

  • Delete an execution plan

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final DeleteExecutionPlanRequest request = new DeleteExecutionPlanRequest();
    5. request.setId("WF-XXXXXXXXXXXXXXXX"); // set execution plan id
    6. try {
    7. DeleteExecutionPlanResponse response = client.getAcsResponse(request);
    8. } catch (Exception e) {
    9. // TODO do something
    10. }
    11. }
  • Run an execution plan

    Note: You cannot run an execution plan in the scheduling or running status.

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. RunExecutionPlanRequest request = new RunExecutionPlanRequest();
    5. request.setRegionId("cn-hangzhou");
    6. request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id which to run
    7. try {
    8. RunExecutionPlanResponse response = client.getAcsResponse(request);
    9. String instanceId = response.getExecutionPlanInstanceId();
    10. // TODO do something with this instance
    11. } catch (Exception e) {
    12. // TODO do something
    13. }
    14. }
  • Suspend the execution plan scheduling

    For an execution plan for periodic scheduling, if it is in the periodic scheduling status, you can suspend the execution plan by suspending the SDK of the plan.

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. SuspendExecutionPlanSchedulerRequest request = new SuspendExecutionPlanSchedulerRequest();
    5. request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id you want to suspend
    6. request.setRegionId("cn-hangzhou"); // specify the region of this execution plan
    7. try {
    8. SuspendExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
    9. } catch (Exception e) {
    10. // TODO do something
    11. }
    12. }
  • Start the execution plan scheduling

    For an execution plan for periodic scheduling, if it is in the periodic scheduling suspended status, you can start the execution plan scheduling by starting the SDK of the suspended plan.

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. ResumeExecutionPlanSchedulerRequest request = new ResumeExecutionPlanSchedulerRequest();
    5. request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id you want to suspend
    6. request.setRegionId("cn-hangzhou"); // specify the region of this execution plan
    7. try {
    8. ResumeExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
    9. } catch (Exception e) {
    10. // TODO do something
    11. }
    12. }
  • Query the execution plan instance list

    The execution plan instance list is a historic record list of an execution plan.

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. ListExecutionPlanInstancesRequest request = new ListExecutionPlanInstancesRequest();
    5. request.setRegionId("cn-hangzhou");
    6. // specify execution plan ids
    7. List<String> executionPlanIds = new ArrayList<String>();
    8. executionPlanIds.add("WF-XXXXXXXXXXXXXXX1");
    9. executionPlanIds.add("WF-XXXXXXXXXXXXXXX2");
    10. executionPlanIds.add("WF-XXXXXXXXXXXXXXX3");
    11. request.setExecutionPlanIdLists(executionPlanIds); // (1)
    12. // specify order key (ordered by id)
    13. request.setIsDesc(true);
    14. // specify page number and page size, default page number is 1 and default page size is 10.
    15. request.setPageSize(20);
    16. request.setPageNumber(1);
    17. // specify if you want to list latest instance for each execution plan id.
    18. request.setOnlyLastInstance(true); // (2) default is false
    19. try {
    20. ListExecutionPlanInstancesResponse response = client.getAcsResponse(request);
    21. for (ListExecutionPlanInstancesResponse.ExecutionPlanInstance instance : response.getExecutionPlanInstances()) {
    22. // TODO do something with each instance
    23. }
    24. } catch (Exception e) {
    25. // TODO do something
    26. }
    27. }
    1. To query the execution history of the execution plan, you can specify multiple execution plan IDs.

    2. If you only specify to query the last execution record, the previous execution record of the execution plan will be returned instead of all the execution history data. It is usually used to determine whether an execution plan or some execution plans have completed the previous execution or to query the status of the previous execution.

Thank you! We've received your feedback.