This topic describes how to call EMR API operations to create and run a job and view the running results.

Prerequisites

Scenarios

You have created a project in the China (Hangzhou) region. You want to create and run jobs in the project.

Basic configuration information of your project and cluster:
  • The project name is emr_openapi_demo_project. The project ID is FP-D18E9976D5A****. If you do not have a project, create a project first. For more information, see Create and manage a project.
  • The project is associated with the emr_openapi_demo cluster. The cluster ID is C-69CB0546800F****.
Note
  • After you call the CreateFlowJob operation to create a job, you can obtain the project ID from the returned information.
  • You can call the ListFlowProject operation to query all projects in the current region and find the project ID in the returned information.

Examples

  • Java
    1. Create a Hive_SQL job. When you call the CreateFlowJob operation, you must specify the parameters listed in the following table.
      Parameter Description
      RegionId The ID of the region, such as cn-hangzhou.
      ProjectId The ID of the project.
      Name The custom name of the job, such as emr_openapi_hivejob.
      Type The type of the job. Valid values: MR, Spark, Hive_SQL, Hive, Pig, Sqoop, Spark_SQL, Spark_Streaming, and Shell.
      Description The description of the job.
      Adhoc Specifies whether the job is a temporary query job. Valid values: true and false.
      Note The ClusterId parameter is optional.
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      public class CreateFlowJob {
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
              CreateFlowJobRequest request = new CreateFlowJobRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setName("emr_openapi_hivejob");
              request.setDescription("Hive_SQL job created by OpenAPI");
              request.setType("HIVE_SQL");
              request.setAdhoc(false);
              try {
                  CreateFlowJobResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
          }
      }
      The job ID is returned.
      {
          "RequestId": "01B0A835-C6AB-4166-8E43-511514D8FAE0",
          "Id": "FJ-E4157D27791D****"
      }
    2. Run the job. Call the SubmitFlowJob operation to submit the job. You can run jobs on only one instance at a time. You must specify the parameters listed in the following table.
      Parameter Description
      ClusterId The ID of the cluster in which the job runs.
      JobId The ID of the job. In this example, set this parameter to FJ-E4157D27791D****, which is returned in Step 1.
      ProjectId The ID of the project.
      RegionId The ID of the region, such as cn-hangzhou.
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      public class SubmitFlowJob {
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
              SubmitFlowJobRequest request = new SubmitFlowJobRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setJobId("FJ-E4157D27791D****");
              request.setClusterId("C-69CB0546800F****");
              try {
                  SubmitFlowJobResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
          }
      }
      The ID of the instance on which the job runs.
      {
          "RequestId": "15BBB0DC-EEC5-4CE4-B4FA-A1D9827F8808",
          "Id": "FJI-54FEBB063136****"
      }
    3. Call the ListFlowNodeSqlResult operation to query the SQL query results of the instance. You can call this operation only if the job type is Hive_SQL or Spark_SQL.

      A maximum of 200 rows can be returned.

      You must specify the parameters listed in the following table.
      Parameter Description
      RegionId The ID of the region, such as cn-hangzhou.
      ProjectId The ID of the project.
      NodeInstanceId The ID of the instance. In this example, set this parameter to FJI-54FEBB063136****, which is returned in Step 2.
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class ListFlowNodeSqlResult {
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
              ListFlowNodeSqlResultRequest request = new ListFlowNodeSqlResultRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setNodeInstanceId("FJI-54FEBB063136****");
              try {
                  ListFlowNodeSqlResultResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
          }
      }
    4. Call the DescribeFlowNodeInstance operation to view the details about job running, such as the running status, start and end time, and running duration. You must specify the parameters listed in the following table.
      Parameter Description
      RegionId The ID of the region, such as cn-hangzhou.
      ProjectId The ID of the project.
      NodeInstanceId The ID of the instance. In this example, set this parameter to FJI-54FEBB063136****, which is returned in Step 2.
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      public class DescribeFlowNodeInstance {
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
              DescribeFlowNodeInstanceRequest request = new DescribeFlowNodeInstanceRequest();
              request.setRegionId("cn-hangzhou");
              request.setNodeInstanceId("FJI-54FEBB063136****");
              request.setProjectId("FP-D18E9976D5A****");
              try {
                  DescribeFlowNodeInstanceResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
          }
      }
  • Python
    1. Create a Hive_SQL job. You must specify the parameters listed in the following table.
      Parameter Description
      RegionId The ID of the region, such as cn-hangzhou.
      ProjectId The ID of the project.
      Name The custom name of the job, such as emr_openapi_hivejob.
      Type The type of the job. Valid values: MR, Spark, Hive_SQL, Hive, Pig, Sqoop, Spark_SQL, Spark_Streaming, and Shell.
      Description The description of the job.
      Adhoc Specifies whether the job is a temporary query job. Valid values: true and false.
      Note The ClusterId parameter is optional.
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.CreateFlowJobRequest import CreateFlowJobRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = CreateFlowJobRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_Name("emr_openapi_hivejob")
      request.set_Description("Hive_SQL job created by OpenAPI")
      request.set_Type("HIVE_SQL")
      request.set_Adhoc(False)
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
      The job ID is returned.
      {
          "RequestId": "01B0A835-C6AB-4166-8E43-511514D8FAE0",
          "Id": "FJ-E4157D27791D****"
      }
    2. Run the job. Call the SubmitFlowJob operation to submit the job. You can run jobs on only one instance at a time. You must specify the parameters listed in the following table.
      Parameter Description
      ClusterId The ID of the cluster in which the job runs.
      JobId The ID of the job. In this example, set this parameter to FJ-E4157D27791D****, which is returned in Step 1.
      ProjectId The ID of the project.
      RegionId The ID of the region, such as cn-hangzhou.
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.SubmitFlowJobRequest import SubmitFlowJobRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = SubmitFlowJobRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_JobId("FJ-E4157D27791D****")
      request.set_ClusterId("C-69CB0546800F****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
      The ID of the instance on which the job runs.
      {
          "RequestId": "15BBB0DC-EEC5-4CE4-B4FA-A1D9827F8808",
          "Id": "FJI-54FEBB063136****"
      }
    3. Call the ListFlowNodeSqlResult operation to query the SQL query results of the instance. You can call this operation only if the job type is Hive_SQL or Spark_SQL.

      A maximum of 200 rows can be returned.

      You must specify the parameters listed in the following table.
      Parameter Description
      RegionId The ID of the region, such as cn-hangzhou.
      ProjectId The ID of the project.
      NodeInstanceId The ID of the instance. In this example, set this parameter to FJI-54FEBB063136****, which is returned in Step 2.
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.ListFlowNodeSqlResultRequest import ListFlowNodeSqlResultRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = ListFlowNodeSqlResultRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_NodeInstanceId("FJI-54FEBB063136****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
    4. Call the DescribeFlowNodeInstance operation to view the details about job running, such as the running status, start and end time, and running duration. You must specify the parameters listed in the following table.
      Parameter Description
      RegionId The ID of the region, such as cn-hangzhou.
      ProjectId The ID of the project.
      NodeInstanceId The ID of the instance. In this example, set this parameter to FJI-54FEBB063136****, which is returned in Step 2.
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.DescribeFlowNodeInstanceRequest import DescribeFlowNodeInstanceRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = DescribeFlowNodeInstanceRequest()
      request.set_accept_format('json')
      
      request.set_Id("FJI-54FEBB063136****")
      request.set_ProjectId("FP-D18E9976D5A****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))

Related API operations