This topic describes how to use E-MapReduce (EMR) SDK for Java to perform common operations. For example, you can create clusters, create jobs, and scale in or scale out node groups.
Create a cluster
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
DefaultAcsClient client = new DefaultAcsClient(profile);
final CreateClusterRequest request = new CreateClusterRequest();
request.setName("Your-Cluster-Name");
request.setSecurityGroupId("Your-Security-Group-Id"); // If no security group ID is specified, a new security group with a given name is created.
request.setAutoRenew(false);
request.setChargeType("PostPaid"); // Specify the billing method. In this example, the pay-as-you-go billing method is used.
request.setClusterType("HADOOP"); // Specify the cluster type.
request.setEmrVer("EMR-1.3.0"); // Specify the EMR version.
request.setIsOpenPublicIp(true);
request.setLogEnable(true);
request.setLogPath("oss://Your-Bucket/Your-Folder");
request.setMasterPwdEnable(true); // Enable password logon for the master node.
request.setMasterPwd("Aa123456789"); // Specify the password for the master node.
request.setZoneId("cn-hangzhou-b"); // Specify the zone ID.
// The availability of hardware configurations, such as ECS instance types and cloud disk types, is determined based on the specified I/O optimization parameter, ECS instance series, and network type. For more information, visit the ECS buy page.
request.setIoOptimized(true); // Set this parameter to true to enable I/O optimization.
request.setInstanceGeneration("ecs-2"); // Set this parameter to ecs-2. Valid values: ecs-1 and ecs-2.
request.setNetType("vpc"); // Specify the network type.
request.setVpcId("your-vpcId");
request.setVSwitchId("your-switchId");
List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
masterOrder.setIndex(1);
masterOrder.setDiskCapacity(50);
masterOrder.setDiskCount(2);
masterOrder.setDiskType("CLOUD_EFFICIENCY"); // Specify the disk type.
masterOrder.setInstanceType("ecs.n1.large"); // Specify the ECS instance type.
masterOrder.setNodeCount(1);
masterOrder.setNodeType("MASTER"); // Specify the node type. In this example, the master node is used.
ecsOrders.add(masterOrder);
CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
coreOrder.setIndex(2);
coreOrder.setDiskCapacity(50);
coreOrder.setDiskCount(4);
coreOrder.setDiskType("CLOUD_EFFICIENCY");
coreOrder.setInstanceType("ecs.n1.large");
coreOrder.setNodeCount(3);
coreOrder.setNodeType("CORE");
ecsOrders.add(coreOrder);
request.setEcsOrders(ecsOrders);
try {
CreateClusterResponse response = client.getAcsResponse(request);
String clusterId = response.getClusterId(); // cluster id
// Perform operations on the cluster.
} catch (Exception e) {
}
}
- When you create a cluster, specify a security group for the cluster. If you do not specify the ID of a security group, you must specify a security group name. This way, a security group is automatically created during cluster creation.
- Specify whether to enable high availability. For more information, see hardware configurations
in Create a cluster.
request.setHighAvailabilityEnable(true);
- Configure the settings of optional software components. For more information, see
software configurations in Create a cluster.
List<String> soft = new ArrayList<String>(); soft.add("presto"); soft.add("oozie"); request.setOptionSoftWareLists(soft);
- Specify other configuration items. For more information, see Customize software configurations.
request.setConfigurations("oss://your-bucket/your-conf.json");
- Configure bootstrap actions. For more information, see Bootstrap actions.
List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>(); CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction(); bootstrapActionList.setName("bootstrapName"); bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py"); bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc"); bootstrapActionLists.add(bootstrapActionList); request.setBootstrapActions(bootstrapActionLists);
Query the details of a cluster
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
DefaultAcsClient client = new DefaultAcsClient(profile);
final DescribeClusterRequest request = new DescribeClusterRequest();
request.setId("C-XXXXXXXXXXXXXXXX"); // Specify the ID of the cluster.
try {
DescribeClusterResponse response = client.getAcsResponse(request);
DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
// Perform operations on the cluster.
} catch (Exception e) {
}
}
Query the list of clusters
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
DefaultAcsClient client = new DefaultAcsClient(profile);
final ListClustersRequest request = new ListClustersRequest();
request.setPageNumber(1);
request.setIsDesc(true);
request.setPageSize(20);
try {
ListClustersResponse response = client.getAcsResponse(request);
List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
// Perform operations on the clusters.
}
} catch (Exception e) {
}
}
Release a cluster
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterRequest request = new ReleaseClusterRequest();
request.setId("C-XXXXXXXXXXXXXXXX"); // Specify the ID of the cluster.
try {
ReleaseClusterResponse response = client.getAcsResponse(request);
} catch (Exception e) {
}
}
Create a job
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
DefaultAcsClient client = new DefaultAcsClient(profile);
final CreateJobRequest request = new CreateJobRequest();
request.setName("Your-Job-Name");
request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
request.setFailAct("CONTINUE"); // If the job fails, skip the job and proceed to subsequent operations.
request.setType("SPARK"); // Specify the job type.
try {
CreateJobResponse response = client.getAcsResponse(request);
String jobId = response.getId();
} catch (Exception e) {
}
}
Delete a job
Important If you want to delete a job that is used for a workflow, you must delete or modify
the workflow first.
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
DefaultAcsClient client = new DefaultAcsClient(profile);
final DeleteJobRequest request = new DeleteJobRequest();
request.setId("J-XXXXXXXXXXXXXXXX"); // Specify the ID of the job.
try {
DeleteJobResponse response = client.getAcsResponse(request);
} catch (Exception e) {
}
}
Scale out a node group
You can increase the number of nodes in a node group to scale out the node group.
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "LTAITY****", "2Sfp******");
DefaultAcsClient client = new DefaultAcsClient(profile);
ResizeClusterV2Request request = new ResizeClusterV2Request();
request.setClusterId("C-0E4B90219*****");
List<ResizeClusterV2Request.HostGroup> hostGroups = new ArrayList<>();
ResizeClusterV2Request.HostGroup hostGroup = new ResizeClusterV2Request.HostGroup();
hostGroups.add(hostGroup);
hostGroup.setHostGroupId("G-F0D0661E0A6E****");
// Specify the number of nodes that you want to add to the node group.
hostGroup.setNodeCount(1);
request.setHostGroups(hostGroups);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));
Scale in a node group
You can decrease the number of nodes in a node group to scale in the node group. You can also specify the IDs of nodes that you want to remove to scale in a node group.
Decrease the number of nodes in a node group to scale in the node group
Important Before you scale in a node group by using this method, update the version of EMR SDK
for Java to 3.3.8.
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-emr</artifactId>
<version>3.3.8</version>
</dependency>
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "LTAITY****", "2Sfp******");
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# Specify the ID of the cluster.
request.setClusterId("C-01A1F4A********");
# Specify the ID of the node group. You can call the ListClusterHostGroup operation to obtain the ID of the node group.
request.setHostGroupId("G-D11D3E*******");
// Specify the number of nodes that you want to remove.
request.setReleaseNumber(3);
request.setEnableGracefulDecommission(true);
// Specify the timeout period for graceful decommissioning of nodes. Unit: seconds. If you do not configure this parameter, the default timeout period 3600s is used.
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));
If YARN decommissioning is enabled, change the value of the yarn.resourcemanager.nodes.exclude-path parameter to /etc/ecm/hadoop-conf/yarn-exclude.xml on the Configure tab of the YARN service in the EMR console. Then, save and deploy
the configuration on the Configure tab. You can also modify the following code to
change the value of the parameter:
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "LTAITY****", "2Sfp******");
DefaultAcsClient client = new DefaultAcsClient(profile);
ModifyClusterServiceConfigRequest modifyClusterServiceConfigRequest = new ModifyClusterServiceConfigRequest();
modifyClusterServiceConfigRequest.setClusterId("C-01A1F4A********");
modifyClusterServiceConfigRequest.setRegionId("cn-hangzhou");
modifyClusterServiceConfigRequest.setServiceName("YARN");
modifyClusterServiceConfigRequest.setConfigParams("{\"yarn-site\":{\"yarn.resourcemanager.nodes.exclude-path\":\"/etc/ecm/hadoop-conf/yarn-exclude.xml\"}}");
modifyClusterServiceConfigRequest.setCustomConfigParams("{}");
modifyClusterServiceConfigRequest.setComment("for decommission gracefully");
modifyClusterServiceConfigRequest.setRefreshHostConfig(Boolean.TRUE);
System.out.println(JSON.toJSONString(client.getAcsResponse(modifyClusterServiceConfigRequest)));
Specify the IDs of the nodes that you want to remove to scale in a node group
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "LTAITY****", "2Sfp******");
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# Specify the ID of the cluster.
request.setClusterId("C-C52CF4246D10****");
# Specify the ID of the node group. You can call the ListClusterHostGroup operation to obtain the ID of the node group.
request.setHostGroupId("G-A24651D939AD****");
// Specify the IDs of the nodes that you want to remove.
List<String> instanceIds = new ArrayList<>();
instanceIds.add("i-xxxxxxx");
instanceIds.add("i-xxxxxxy");
request.setInstanceIdList(JSON.toJSONString(instanceIds));
request.setEnableGracefulDecommission(true);
// Specify the timeout period for graceful decommissioning of nodes. Unit: seconds. If you do not configure this parameter, the default timeout period 3600s is used.
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));