This topic describes how to call EMR API operations to create and run a job and view the running results.
Prerequisites
- An EMR cluster is created. For more information, see Create a cluster.
- The cluster ID is obtained. For more information, see View the cluster list and cluster details.
- An AccessKey pair is created. For more information, see Obtain an AccessKey pair.
- The required SDK is obtained. To obtain SDK for Java, see Download SDKs. To obtain SDK for Python, see Install SDK.
Scenarios
You have created a project in the China (Hangzhou) region. You want to create and run jobs in the project.
Basic configuration information of your project and cluster:
- The project name is emr_openapi_demo_project. The project ID is FP-D18E9976D5A****. If you do not have a project, create a project first. For more information, see Create and manage a project.
- The project is associated with the emr_openapi_demo cluster. The cluster ID is C-69CB0546800F****.
Note
- After you call the CreateFlowJob operation to create a job, you can obtain the project ID from the returned information.
- You can call the ListFlowProject operation to query all projects in the current region and find the project ID in the returned information.
Examples
- Java
- Create a Hive_SQL job. When you call the CreateFlowJob operation, you must specify
the parameters listed in the following table.
Parameter Description RegionId The ID of the region, such as cn-hangzhou. ProjectId The ID of the project. Name The custom name of the job, such as emr_openapi_hivejob. Type The type of the job. Valid values: MR, Spark, Hive_SQL, Hive, Pig, Sqoop, Spark_SQL, Spark_Streaming, and Shell. Description The description of the job. Adhoc Specifies whether the job is a temporary query job. Valid values: true and false. Note The ClusterId parameter is optional.import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class CreateFlowJob { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); CreateFlowJobRequest request = new CreateFlowJobRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setName("emr_openapi_hivejob"); request.setDescription("Hive_SQL job created by OpenAPI"); request.setType("HIVE_SQL"); request.setAdhoc(false); try { CreateFlowJobResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
The job ID is returned.{ "RequestId": "01B0A835-C6AB-4166-8E43-511514D8FAE0", "Id": "FJ-E4157D27791D****" }
- Run the job. Call the SubmitFlowJob operation to submit the job. You can run jobs
on only one instance at a time. You must specify the parameters listed in the following
table.
Parameter Description ClusterId The ID of the cluster in which the job runs. JobId The ID of the job. In this example, set this parameter to FJ-E4157D27791D****, which is returned in Step 1. ProjectId The ID of the project. RegionId The ID of the region, such as cn-hangzhou. import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class SubmitFlowJob { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); SubmitFlowJobRequest request = new SubmitFlowJobRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setJobId("FJ-E4157D27791D****"); request.setClusterId("C-69CB0546800F****"); try { SubmitFlowJobResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
The ID of the instance on which the job runs.{ "RequestId": "15BBB0DC-EEC5-4CE4-B4FA-A1D9827F8808", "Id": "FJI-54FEBB063136****" }
- Call the ListFlowNodeSqlResult operation to query the SQL query results of the instance.
You can call this operation only if the job type is Hive_SQL or Spark_SQL.
A maximum of 200 rows can be returned.
You must specify the parameters listed in the following table.Parameter Description RegionId The ID of the region, such as cn-hangzhou. ProjectId The ID of the project. NodeInstanceId The ID of the instance. In this example, set this parameter to FJI-54FEBB063136****, which is returned in Step 2. import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class ListFlowNodeSqlResult { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); ListFlowNodeSqlResultRequest request = new ListFlowNodeSqlResultRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setNodeInstanceId("FJI-54FEBB063136****"); try { ListFlowNodeSqlResultResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
- Call the DescribeFlowNodeInstance operation to view the details about job running,
such as the running status, start and end time, and running duration. You must specify
the parameters listed in the following table.
Parameter Description RegionId The ID of the region, such as cn-hangzhou. ProjectId The ID of the project. NodeInstanceId The ID of the instance. In this example, set this parameter to FJI-54FEBB063136****, which is returned in Step 2. import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class DescribeFlowNodeInstance { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); DescribeFlowNodeInstanceRequest request = new DescribeFlowNodeInstanceRequest(); request.setRegionId("cn-hangzhou"); request.setNodeInstanceId("FJI-54FEBB063136****"); request.setProjectId("FP-D18E9976D5A****"); try { DescribeFlowNodeInstanceResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
- Create a Hive_SQL job. When you call the CreateFlowJob operation, you must specify
the parameters listed in the following table.
- Python
- Create a Hive_SQL job. You must specify the parameters listed in the following table.
Parameter Description RegionId The ID of the region, such as cn-hangzhou. ProjectId The ID of the project. Name The custom name of the job, such as emr_openapi_hivejob. Type The type of the job. Valid values: MR, Spark, Hive_SQL, Hive, Pig, Sqoop, Spark_SQL, Spark_Streaming, and Shell. Description The description of the job. Adhoc Specifies whether the job is a temporary query job. Valid values: true and false. Note The ClusterId parameter is optional.#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.CreateFlowJobRequest import CreateFlowJobRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = CreateFlowJobRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_Name("emr_openapi_hivejob") request.set_Description("Hive_SQL job created by OpenAPI") request.set_Type("HIVE_SQL") request.set_Adhoc(False) response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
The job ID is returned.{ "RequestId": "01B0A835-C6AB-4166-8E43-511514D8FAE0", "Id": "FJ-E4157D27791D****" }
- Run the job. Call the SubmitFlowJob operation to submit the job. You can run jobs
on only one instance at a time. You must specify the parameters listed in the following
table.
Parameter Description ClusterId The ID of the cluster in which the job runs. JobId The ID of the job. In this example, set this parameter to FJ-E4157D27791D****, which is returned in Step 1. ProjectId The ID of the project. RegionId The ID of the region, such as cn-hangzhou. #!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.SubmitFlowJobRequest import SubmitFlowJobRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = SubmitFlowJobRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_JobId("FJ-E4157D27791D****") request.set_ClusterId("C-69CB0546800F****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
The ID of the instance on which the job runs.{ "RequestId": "15BBB0DC-EEC5-4CE4-B4FA-A1D9827F8808", "Id": "FJI-54FEBB063136****" }
- Call the ListFlowNodeSqlResult operation to query the SQL query results of the instance.
You can call this operation only if the job type is Hive_SQL or Spark_SQL.
A maximum of 200 rows can be returned.
You must specify the parameters listed in the following table.Parameter Description RegionId The ID of the region, such as cn-hangzhou. ProjectId The ID of the project. NodeInstanceId The ID of the instance. In this example, set this parameter to FJI-54FEBB063136****, which is returned in Step 2. #!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.ListFlowNodeSqlResultRequest import ListFlowNodeSqlResultRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = ListFlowNodeSqlResultRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_NodeInstanceId("FJI-54FEBB063136****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
- Call the DescribeFlowNodeInstance operation to view the details about job running,
such as the running status, start and end time, and running duration. You must specify
the parameters listed in the following table.
Parameter Description RegionId The ID of the region, such as cn-hangzhou. ProjectId The ID of the project. NodeInstanceId The ID of the instance. In this example, set this parameter to FJI-54FEBB063136****, which is returned in Step 2. #!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.DescribeFlowNodeInstanceRequest import DescribeFlowNodeInstanceRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = DescribeFlowNodeInstanceRequest() request.set_accept_format('json') request.set_Id("FJI-54FEBB063136****") request.set_ProjectId("FP-D18E9976D5A****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
- Create a Hive_SQL job. You must specify the parameters listed in the following table.