本节介绍使用 Java SDK 快速创建集群、作业、执行计划。

说明

OpenApI Explorer提供在线调用云产品 API、动态生成 SDK 示例代码和快速检索接口等功能,能显著降低使用API的难度,推荐您使用。

环境准备

创建一个 Maven 工程,添加 Maven 依赖,如下所示。
<dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>2.3.9</version>
       </dependency>
       <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-emr</artifactId>
            <version>2.2.2</version>
        </dependency>

或者直接下载对应的 JAR 文件到本地。以 Eclipse 为例,其操作步骤如下:

  1. 下载 JAR 文件。

    aliyun-java-sdk-core-2.3.9.jar

    aliyun-java-sdk-emr-2.2.2.jar

  2. 将下载后的文件拷贝到您的项目中。
  3. 在 Eclipse 中选择右击您的工程名称,然后单击 Properties > Java Build Path > Add JARs
  4. 选中您在步骤 2 中拷贝的所有 JAR 文件。

经过以上几步,您就可以在 Eclipse 项目中使用阿里云 E-MapReduce OpenAPI Java SDK。

初始化 Client

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>"); DefaultAcsClient client = new DefaultAcsClient(profile);

SDK 中所有对 E-MapReduce 的操作都可以使用这个 client 来进行。

示例代码

  • 集群
    • 创建集群
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final CreateClusterRequest request = new CreateClusterRequest();
            request.setName("Your-Cluster-Name");
            // 如果未指定安全组id,则它将创建具有给定名称的新安全组
            request.setSecurityGroupId("Your-Security-Group-Id"); // (1)
            request.setAutoRenew(false);
            request.setChargeType("PostPaid"); //后付费或者预付费
            request.setClusterType("HADOOP"); //HADOOP或HBase (2)
            request.setEmrVer("EMR-1.3.0"); //emr版本
            request.setIsOpenPublicIp(true);
            request.setLogEnable(true);
            request.setLogPath("oss://Your-Bucket/Your-Folder");
            request.setMasterPwdEnable(true); //启用主节点密码
            request.setMasterPwd("Aa123456789"); //设置主节点密码
            request.setZoneId("cn-hangzhou-b"); //设置区域
             // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型)
            // 详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置
            // https://ecs.console.aliyun.com/#/create/postpay/
            request.setIoOptimized(true); // 设置IO优化参数
            request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列,ecs-1/ecs-2
            request.setNetType("classic"); // 设置网络类型 classic/vpc
            List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
            CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
            masterOrder.setIndex(1);
            masterOrder.setDiskCapacity(50);
            masterOrder.setDiskCount(2);
            masterOrder.setDiskType("CLOUD_EFFICIENCY"); //指定磁盘类型 (2)
            masterOrder.setInstanceType("ecs.n1.large"); //指定ecs实例类型
            masterOrder.setNodeCount(1);
            masterOrder.setNodeType("MASTER"); // 主节点或者核心节点 (2)
            ecsOrders.add(masterOrder);
            CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
            coreOrder.setIndex(2);
            coreOrder.setDiskCapacity(50);
            coreOrder.setDiskCount(4);
            coreOrder.setDiskType("CLOUD_EFFICIENCY");
            coreOrder.setInstanceType("ecs.n1.large");
            coreOrder.setNodeCount(3);
            coreOrder.setNodeType("CORE");
            ecsOrders.add(coreOrder);
            request.setEcsOrders(ecsOrders);
            try {
                CreateClusterResponse response = client.getAcsResponse(request);
                String clusterId = response.getClusterId(); // cluster id
                //对集群执行操作
            } catch (Exception e) {
      
            }
        }
      • 创建集群需要指定集群属于哪个安全组。如果不指定安全组 ID,则需要指定一个安全组名称,在创建集群的同时新建一个安全组。
      • 具体枚举取值,请参见:枚举类型
      • 上述示例代码中,是创建了一个经典网络的集群,如果需要创建 VPC 网络的集群,则需要将 request 中的网络类型设置为 vpc,并且指定 vpcid 和 vswitchid,如下所示:
        request.setNetType("vpc"); // 设置网络类型 classic/vpc
        request.setVpcId("your-vpcId");
        request.setVSwitchId("your-switchId");
      • 设置高可用参数,关于高可用参数的说明请参见创建集群的硬件配置部分。
        request.setHighAvailabilityEnable(true);
      • 设置可选软件组件,关于可选软件组件的说明请参见创建集群的软件配置章节。
        List<String> soft = new ArrayList<String>();
        soft.add("presto");
        soft.add("oozie");
        request.setOptionSoftWareLists(soft);
      • 设置配置项,请参见软件配置
        request.setConfigurations("oss://your-bucket/your-conf.json");
      • 设置引导操作,请参见引导操作
        List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>();
        CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction();
        bootstrapActionList.setName("bootstrapName");
        bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py");
        bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc");
        bootstrapActionLists.add(bootstrapActionList);
        request.setBootstrapActions(bootstrapActionLists);
    • 查询集群详情
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final DescribeClusterRequest request = new DescribeClusterRequest();
            request.setId("C-XXXXXXXXXXXXXXXX"); //集群id
            try {
                DescribeClusterResponse response = client.getAcsResponse(request);
                DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
                //对集群执行操作
            } catch (Exception e) {
      
            }
        }
    • 查询集群列表
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final ListClustersRequest request = new ListClustersRequest();
                request.setPageNumber(1);
                request.setIsDesc(true);
                request.setPageSize(20);
                try {
                    ListClustersResponse response = client.getAcsResponse(request);
                    List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
                    for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
                        //对集群执行操作
                    }
                } catch (Exception e) {
      
                }
            }
    • 释放集群
      public static void main(String[] args) {
                  IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                  DefaultAcsClient client = new DefaultAcsClient(profile);
                  ReleaseClusterRequest request = new ReleaseClusterRequest();
                  request.setId("C-XXXXXXXXXXXXXXXX"); //指定要释放的群集id
                  try {
                      ReleaseClusterResponse response = client.getAcsResponse(request);
                  } catch (Exception e) {
      
                  }
              }
  • 作业
    • 创建作业
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final CreateJobRequest request = new CreateJobRequest();
            request.setName("Your-Job-Name");
            request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
            request.setFailAct("CONTINUE"); //停止或继续
            request.setType("SPARK"); // SPARK、HADOOP、HIVE或PIG
      try {
                  CreateJobResponse response = client.getAcsResponse(request);
                  String jobId = response.getId();
              } catch (Exception e) {
      
              }
          }
    • 删除作业
      注意 如果一个作业被其他执行计划使用,则不能删除,需要先删除对应的执行计划或者修改对应的执行计划。
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final DeleteJobRequest request = new DeleteJobRequest();
                request.setId("J-XXXXXXXXXXXXXXXX"); //设置作业id
                try {
                    DeleteJobResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
      
                }
            }
  • 执行计划
    • 创建执行计划
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final CreateExecutionPlanRequest request = new CreateExecutionPlanRequest();
                request.setName("Your-ExecutionPlan-Name");
                request.setCreateClusterOnDemand(false);
                request.setStrategy("RUN_MANUALLY"); //手动运行或计划
                request.setClusterId("C-XXXXXXXXXXXXXXXX"); //指定现有运行的群集
                List<String> jobIds = new ArrayList<String>();
                jobIds.add("J-XXXXXXXXXXXXXXXX"); //指定作业
                request.setJobIdLists(jobIds);
                try {
                    CreateExecutionPlanResponse response = client.getAcsResponse(request);
                    String executionPlanId = response.getId();
                    // 执行操作
                } catch (Exception e) {
      
                }
            }

      上述实例代码创建了一个手动执行(非周期调度的)执行计划,并且该执行计划关联了一个已经创建好的集群。

      如果需要创建一个周期调度的执行计划,则需要修改或者增加如下代码。
      request.setStrategy("SCHEDULE"); //手动运行或计划
                request.setStartTime(new Date().getTime()); //设置启动时间
                request.setTimeUnit("DAY"); // DAY或者HOUR
                request.setTimeInterval(1); //设置时间间隔
      如果需要创建一个按需创建集群的执行计划,则需要修改和增加如下代码。
      request.setCreateClusterOnDemand(true);
                request.setClusterType("HADOOP");
                request.setClusterName("Your-Cluster-Name");
                request.setEmrVer("EMR-1.3.0");
                request.setSecurityGroupId("Your-Security-Group-Id");
                request.setIsOpenPublicIp(true);
                 // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型)
                // 详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置
                // https://ecs.console.aliyun.com/#/create/postpay/
                request.setIoOptimized(true); // 设置IO优化参数
                request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列,ecs-1/ecs-2
                request.setNetType("classic"); // 设置网络类型 classic/vpc
                request.setLogEnable(true);
                request.setLogPath("oss://xxx");
                request.setEcsOrders(); // TODO 参考创建集群的参数设置方式。注意,这里的 ecsOder 的类型为 CreateExecutionPlanRequest.EcsOrder,与创建集群的 CreateClusterRequest.EcsOrder 不同。

      通过上面的参数来指定一个集群的配置,具体的参数设置可以参考创建集群的逻辑。按需创建集群的执行计划,会在每次执行计划启动的时候按照设定的集群配置去新建一个临时集群来运行执行计划,执行计划完成之后该集群会自动释放。与创建集群逻辑稍有不同的是,这里创建的集群必须指定一个安全组 ID(即不能指定安全组名称来新建一个安全组)。

      当然,周期调度和按需创建集群并不矛盾,即可以创建一个执行计划,它是周期调度的并且是按需创建集群的。

    • 删除执行计划
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final DeleteExecutionPlanRequest request = new DeleteExecutionPlanRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); //设置执行计划id
                try {
                    DeleteExecutionPlanResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
      
                }
            }
    • 运行执行计划
      注意 处于调度中或者正在运行的执行计划不能运行。
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                RunExecutionPlanRequest request = new RunExecutionPlanRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id which to run
                try {
                    RunExecutionPlanResponse response = client.getAcsResponse(request);
                    String instanceId = response.getExecutionPlanInstanceId();
                    // 对该实例执行操作
                } catch (Exception e) {
      
                }
            }
    • 暂停执行计划调度
      对于周期性的执行计划,如果正处于周期调度中(如下图所示),可以通过暂停执行计划的 SDK 将调度中的周期执行计划暂停。集群管理
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                SuspendExecutionPlanSchedulerRequest request = new SuspendExecutionPlanSchedulerRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // 指定要中止的执行计划id
                try {
                    SuspendExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
      
                }
            }
    • 启动执行计划调度
      对于周期性的执行计划,如果正处于暂停调度状态中(如下图所示),可以通过启动执行计划调度的 SDK 将暂停中的周期执行计划启动调度。集群管理2
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                ResumeExecutionPlanSchedulerRequest request = new ResumeExecutionPlanSchedulerRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); //指定要中止的执行计划id
                try {
                    ResumeExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
      
                }
            }
    • 查询执行计划实例列表
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                ListExecutionPlanInstancesRequest request = new ListExecutionPlanInstancesRequest();
                //指定执行计划ID
                List<String> executionPlanIds = new ArrayList<String>();
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX1");
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX2");
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX3");
                request.setExecutionPlanIdLists(executionPlanIds); // (1)
                //指定顺序键(按id排序)
                request.setIsDesc(true);
                //指定页码和页面大小,默认页码为1,默认页面大小为10。
                request.setPageSize(20);
                request.setPageNumber(1);
                // 指定是否要列出每个执行计划id的最新实例。
                request.setOnlyLastInstance(true); // (2) 默认是false
                try {
                    ListExecutionPlanInstancesResponse response = client.getAcsResponse(request);
                    for (ListExecutionPlanInstancesResponse.ExecutionPlanInstance instance : response.getExecutionPlanInstances()) {
                        // 对每个实例进行操作
                    }
                } catch (Exception e) {
      
                }
            }
      • 查询执行计划的执行历史纪录,可以指定多个执行计划 ID。
      • 如果指定了只查上一次执行纪录,则会返回指定执行计划的上一次执行纪录,不会返回所有执行纪录数据。通常用于判断某个或者某些执行计划上次执行是否执行完成,或者查询上次执行的执行状态。