本文介绍如何使用Java SDK快速创建集群和创建作业。
说明 OpenApI Explorer提供在线调用云产品API、动态生成SDK示例代码和快速检索接口等功能,能显著降低使用API的难度,推荐您使用。
环境准备
在Eclipse项目中使用阿里云E-MapReduce OpenAPI Java SDK。
- 创建一个Maven工程,添加Maven依赖。
<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>2.3.9</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-emr</artifactId> <version>2.2.2</version> </dependency>
- 下载对应的JAR文件到本地。以Eclipse为例,其操作步骤如下:
- 下载JAR文件。
- 将下载后的文件拷贝到您的项目中。
- 在Eclipse中选择右键单击您的工程名称,然后选择 。
- 选中您在步骤2中拷贝的所有JAR文件。
初始化Client
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
DefaultAcsClient client = new DefaultAcsClient(profile);
示例代码
- 集群
- 创建集群
public static void main(String[] args) { IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>"); DefaultAcsClient client = new DefaultAcsClient(profile); final CreateClusterRequest request = new CreateClusterRequest(); request.setName("Your-Cluster-Name"); request.setSecurityGroupId("Your-Security-Group-Id"); //如果未指定安全组ID,则将创建具有给定名称的新安全组。 request.setAutoRenew(false); request.setChargeType("PostPaid"); //付费类型,按量付费。 request.setClusterType("HADOOP"); //集群类型。 request.setEmrVer("EMR-1.3.0"); //EMR版本。 request.setIsOpenPublicIp(true); request.setLogEnable(true); request.setLogPath("oss://Your-Bucket/Your-Folder"); request.setMasterPwdEnable(true); //启用主节点密码。 request.setMasterPwd("Aa123456789"); //设置主节点密码。 request.setZoneId("cn-hangzhou-b"); //设置地域。 // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型),详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置。 request.setIoOptimized(true); // 设置IO优化参数。 request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列,取值支持ecs-1和ecs-2。 request.setNetType("vpc"); // 设置网络类型。 request.setVpcId("your-vpcId"); request.setVSwitchId("your-switchId"); List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>(); CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder(); masterOrder.setIndex(1); masterOrder.setDiskCapacity(50); masterOrder.setDiskCount(2); masterOrder.setDiskType("CLOUD_EFFICIENCY"); //指定磁盘类型。 masterOrder.setInstanceType("ecs.n1.large"); //指定ecs实例类型。 masterOrder.setNodeCount(1); masterOrder.setNodeType("MASTER"); // 主节点。 ecsOrders.add(masterOrder); CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder(); coreOrder.setIndex(2); coreOrder.setDiskCapacity(50); coreOrder.setDiskCount(4); coreOrder.setDiskType("CLOUD_EFFICIENCY"); coreOrder.setInstanceType("ecs.n1.large"); coreOrder.setNodeCount(3); coreOrder.setNodeType("CORE"); ecsOrders.add(coreOrder); request.setEcsOrders(ecsOrders); try { CreateClusterResponse response = client.getAcsResponse(request); String clusterId = response.getClusterId(); // cluster id //对集群执行操作。 } catch (Exception e) { } }
- 创建集群需要指定集群所属的安全组。如果不指定安全组ID,则需要指定一个安全组名称,在创建集群的同时新建一个安全组。
- 设置高可用参数,详情请参见创建集群的硬件配置部分。
request.setHighAvailabilityEnable(true);
- 设置可选软件组件,详情请参见创建集群的软件配置章节。
List<String> soft = new ArrayList<String>(); soft.add("presto"); soft.add("oozie"); request.setOptionSoftWareLists(soft);
- 设置配置项,详情请参见软件配置。
request.setConfigurations("oss://your-bucket/your-conf.json");
- 设置引导操作,详情请参见引导操作。
List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>(); CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction(); bootstrapActionList.setName("bootstrapName"); bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py"); bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc"); bootstrapActionLists.add(bootstrapActionList); request.setBootstrapActions(bootstrapActionLists);
- 查询集群详情
public static void main(String[] args) { IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>"); DefaultAcsClient client = new DefaultAcsClient(profile); final DescribeClusterRequest request = new DescribeClusterRequest(); request.setId("C-XXXXXXXXXXXXXXXX"); //集群ID。 try { DescribeClusterResponse response = client.getAcsResponse(request); DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo(); //对集群执行操作。 } catch (Exception e) { } }
- 查询集群列表
public static void main(String[] args) { IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>"); DefaultAcsClient client = new DefaultAcsClient(profile); final ListClustersRequest request = new ListClustersRequest(); request.setPageNumber(1); request.setIsDesc(true); request.setPageSize(20); try { ListClustersResponse response = client.getAcsResponse(request); List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters(); for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) { //对集群执行操作。 } } catch (Exception e) { } }
- 释放集群
public static void main(String[] args) { IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>"); DefaultAcsClient client = new DefaultAcsClient(profile); ReleaseClusterRequest request = new ReleaseClusterRequest(); request.setId("C-XXXXXXXXXXXXXXXX"); //指定要释放的群集ID。 try { ReleaseClusterResponse response = client.getAcsResponse(request); } catch (Exception e) { } }
- 创建集群
- 作业
- 创建作业
public static void main(String[] args) { IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>"); DefaultAcsClient client = new DefaultAcsClient(profile); final CreateJobRequest request = new CreateJobRequest(); request.setName("Your-Job-Name"); request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\""); request.setFailAct("CONTINUE"); //继续作业。 request.setType("SPARK"); //作业类型。
try { CreateJobResponse response = client.getAcsResponse(request); String jobId = response.getId(); } catch (Exception e) { } }
- 删除作业
注意 如果一个作业被其他工作流使用,则不能删除,需要先删除对应的工作流或者修改对应的工作流。
public static void main(String[] args) { IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>"); DefaultAcsClient client = new DefaultAcsClient(profile); final DeleteJobRequest request = new DeleteJobRequest(); request.setId("J-XXXXXXXXXXXXXXXX"); //设置作业ID。 try { DeleteJobResponse response = client.getAcsResponse(request); } catch (Exception e) { } }
- 创建作业