This topic describes how to call EMR API operations to create and run a workflow and view the running results and logs.

Scenarios

You have created a project in the China (Hangzhou) region and created three Hive_SQL jobs that have dependencies for each other in the project.

Basic information of the project and jobs:
  • The project name is emr_openapi_demo_project. The project ID is FP-D18E9976D5A****.
  • Hive_SQL jobs:
    • customer_logs: The job ID is FJ-79B3F70E23D6****.
    • log_aggregation: The job ID is FJ-CECB36039155****.
    • customer_pic: The job ID is FJ-99CFAE652650****.

Examples

  • Java
    1. In the emr_openapi_demo_project project, create a workflow named new_flow. You can customize a workflow graph.Workflow
      Parameter Description
      CreateCluster Specifies whether to create a cluster for the workflow based on a cluster template.
      • true: Create a cluster based on a cluster template. In this case, set ClusterId to the ID of the cluster template.
      • false: Run the workflow in an existing cluster. In this case, set ClusterId to the ID of the existing cluster.
      graph The information of the workflow graph.
      The following code shows the settings of the graph parameters.
      {
          "nodes":[
              {
                  "shape":"startControlNode",
                  "type":"node",
                  "size":"80*34",
                  "x":500.0,
                  "y":250.0,
                  "id":"48d474ea",
                  "index":0.0,
                  "attribute":{"type":"START"}
              },
              {
                  "shape":"hiveSQLJobNode",\
                  "type":"node",
                  "size":"170*34",
                  "x":498.0,
                  "y":324.5,
                  "id":"cd5eb72d",
                  "index":1.0,
                  "label":"customer_logs",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-79B3F70E23D6****",
                      "jobType":"HIVE_SQL"
                      }
              },
              {
                  "shape":"hiveSQLJobNode",
                  "type":"node",
                  "size":"170*34",
                  "trackerPath":"ec5a56bc4a261c22",
                  "x":497.0,
                  "y":416.5,
                  "id":"b308995d",
                  "index":2.0,
                  "label":"log_aggregation;",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-CECB36039155****",
                      "jobType":"HIVE_SQL"
                      }
              },
              {
                  "shape":"hiveSQLJobNode",
                  "type":"node",
                  "size":"170*34",
                  "trackerPath":"ec5a56bc4a261c22",
                  "x":501.0,
                  "y":516.5,
                  "id":"35c8d9c5",
                  "index":3.0,
                  "label":"customer_pic",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-99CFAE652650****",
                      "jobType":"HIVE_SQL"
                      },
              },
              {
                  "shape":"endControlNode",
                  "type":"node",
                  "size":"80*34",
                  "x":503.0,
                  "y":612.5,
                  "id":"65f9c9a4",
                  "index":7.0,
                  "attribute":{"type":"END"}
              }
          ],
          "edges":
          [
              {
                  "source":"48d474ea",
                  "sourceAnchor":0.0,
                  "target":"cd5eb72d",
                  "targetAnchor":0.0,
                  "id":"3820959f",
                  "index":4.0
              },
              {
                  "source":"cd5eb72d",
                  "sourceAnchor":1.0,
                  "target":"b308995d",
                  "targetAnchor":0.0,
                  "id":"248f9dd5",
                  "index":5.0
              },
              {
                  "source":"b308995d",
                  "sourceAnchor":1.0,
                  "target":"35c8d9c5",
                  "targetAnchor":0.0,
                  "id":"dd21ddbf",
                  "index":6.0
              },
              {
                  "source":"35c8d9c5",
                  "sourceAnchor":1.0,
                  "target":"65f9c9a4",
                  "targetAnchor":0.0,
                  "id":"7ab0cd5e",
                  "index":8.0
              }
              ]
      }
      Note
      • The graph code consists of the nodes and edges parts. You can specify the id parameter based on your business requirements. Make sure that the IDs are unique. x, y, and size specify the position and size of the graph. Each graph must have a START node and an END node.
      • You can create a workflow in the EMR console by dragging nodes to the canvas. For more information, see Manage workflows. Then, you can call the DescribeFlow operation to obtain the graph information from the Graph parameter in the returned information.
      Create a workflow.
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class CreateFlowForWeb {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              CreateFlowForWebRequest request = new CreateFlowForWebRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setName("new_flow1");
              request.setDescription("create flow by openAPI");
              request.setCreateCluster(false);
              request.setClusterId("C-B503DDB15B34****");
              request.setGraph("{\"nodes\":[{\"shape\":\"startControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"spmAnchorId\":\"0.0.0.i0.766645eb2cmNtQ\",\"x\":500.0,\"y\":250.0,\"id\":\"48d474ea\",\"index\":0.0,\"attribute\":{\"type\":\"START\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":498.0,\"y\":324.5,\"id\":\"cd5eb72d\",\"index\":1.0,\"label\":\"customer_logs\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-79B3F70E23D6****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":497.0,\"y\":416.5,\"id\":\"b308995d\",\"index\":2.0,\"label\":\"log_aggregation;\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-CECB36039155****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":501.0,\"y\":516.5,\"id\":\"35c8d9c5\",\"index\":3.0,\"label\":\"customer_pic\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-99CFAE652650****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"endControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"trackerPath\":\"bb4b171fe9d130b4\",\"x\":503.0,\"y\":612.5,\"id\":\"65f9c9a4\",\"index\":7.0,\"attribute\":{\"type\":\"END\"}}],\"edges\":[{\"source\":\"48d474ea\",\"sourceAnchor\":0.0,\"target\":\"cd5eb72d\",\"targetAnchor\":0.0,\"id\":\"3820959f\",\"index\":4.0},{\"source\":\"cd5eb72d\",\"sourceAnchor\":1.0,\"target\":\"b308995d\",\"targetAnchor\":0.0,\"id\":\"248f9dd5\",\"index\":5.0},{\"source\":\"b308995d\",\"sourceAnchor\":1.0,\"target\":\"35c8d9c5\",\"targetAnchor\":0.0,\"id\":\"dd21ddbf\",\"index\":6.0},{\"source\":\"35c8d9c5\",\"sourceAnchor\":1.0,\"target\":\"65f9c9a4\",\"targetAnchor\":0.0,\"id\":\"7ab0cd5e\",\"index\":8.0}]}");
      
              try {
                  CreateFlowForWebResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }
      The ID of the workflow is returned.
      {
          "RequestId": "417BA643-0E5F-456E-8222-B3EB5DD800DF",
          "Id": "F-AC0915556C24****"
      }
    2. Submit the workflow.
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class SubmitFlow {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              SubmitFlowRequest request = new SubmitFlowRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setFlowId("F-AC0915556C24****");
      
              try {
                  SubmitFlowResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }
      The ID of the instance on which the workflow runs is returned.
      {
          "Data": "FI-521925BE9816****",
          "InstanceId": "FI-521925BE9816****",
          "RequestId": "DBD1D138-C7F0-4D81-B3F2-0B4512E4A74C",
          "Id": "FI-521925BE9816****"
      }
    3. Query the information of the instance on which the workflow runs. View the running status of the instance and that of each workflow node in the returned information.
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class DescribeFlowInstance {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              DescribeFlowInstanceRequest request = new DescribeFlowInstanceRequest();
              request.setRegionId("cn-hangzhou");
              request.setId("FI-521925BE9816****");
              request.setProjectId("FP-D18E9976D5A****");
      
              try {
                  DescribeFlowInstanceResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }
    4. You can call the following operations based on the obtained running status of each workflow node:
      • DescribeFlowNodeInstance: queries the details of the instance on which a workflow node runs.
      • DescribeFlowNodeInstanceLauncherLog: queries the launcher log of the instance on which a workflow node runs.
      • ListFlowNodeInstanceContainerStatus: queries the status of the container of the instance on which a workflow node runs.
  • Python
    1. In the emr_openapi_demo_project project, create a workflow named new_flow. You can customize a workflow graph.Workflow
      Parameter Description
      CreateCluster Specifies whether to create a cluster for the workflow based on a cluster template.
      • true: Create a cluster based on a cluster template. In this case, set ClusterId to the ID of the cluster template. The ID is in the format of C-xxx.
      • false: Run the workflow in an existing cluster. In this case, set ClusterId to the ID of the existing cluster. The ID is in the format of C-xxx.
      graph The information of the workflow graph.
      The following code shows the settings of the graph parameters:
      {
          "nodes":[
              {
                  "shape":"startControlNode",
                  "type":"node",
                  "size":"80*34",
                  "x":500.0,
                  "y":250.0,
                  "id":"48d474ea",
                  "index":0.0,
                  "attribute":{"type":"START"}
              },
              {
                  "shape":"hiveSQLJobNode",\
                  "type":"node",
                  "size":"170*34",
                  "x":498.0,
                  "y":324.5,
                  "id":"cd5eb72d",
                  "index":1.0,
                  "label":"customer_logs",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-79B3F70E23D6****",
                      "jobType":"HIVE_SQL"
                      }
              },
              {
                  "shape":"hiveSQLJobNode",
                  "type":"node",
                  "size":"170*34",
                  "trackerPath":"ec5a56bc4a261c22",
                  "x":497.0,
                  "y":416.5,
                  "id":"b308995d",
                  "index":2.0,
                  "label":"log_aggregation;",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-CECB36039155****",
                      "jobType":"HIVE_SQL"
                      }
              },
              {
                  "shape":"hiveSQLJobNode",
                  "type":"node",
                  "size":"170*34",
                  "trackerPath":"ec5a56bc4a261c22",
                  "x":501.0,
                  "y":516.5,
                  "id":"35c8d9c5",
                  "index":3.0,
                  "label":"customer_pic",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-99CFAE652650****",
                      "jobType":"HIVE_SQL"
                      },
              },
              {
                  "shape":"endControlNode",
                  "type":"node",
                  "size":"80*34",
                  "x":503.0,
                  "y":612.5,
                  "id":"65f9c9a4",
                  "index":7.0,
                  "attribute":{"type":"END"}
              }
          ],
          "edges":
          [
              {
                  "source":"48d474ea",
                  "sourceAnchor":0.0,
                  "target":"cd5eb72d",
                  "targetAnchor":0.0,
                  "id":"3820959f",
                  "index":4.0
              },
              {
                  "source":"cd5eb72d",
                  "sourceAnchor":1.0,
                  "target":"b308995d",
                  "targetAnchor":0.0,
                  "id":"248f9dd5",
                  "index":5.0
              },
              {
                  "source":"b308995d",
                  "sourceAnchor":1.0,
                  "target":"35c8d9c5",
                  "targetAnchor":0.0,
                  "id":"dd21ddbf",
                  "index":6.0
              },
              {
                  "source":"35c8d9c5",
                  "sourceAnchor":1.0,
                  "target":"65f9c9a4",
                  "targetAnchor":0.0,
                  "id":"7ab0cd5e",
                  "index":8.0
              }
              ]
      }
      Note
      • The graph code consists of the nodes and edges parts. You can specify the id parameter based on your business requirements. Make sure that the IDs are unique. x, y, and size specify the position and size of the graph. Each graph must have a START node and an END node.
      • You can create a workflow in the EMR console by dragging nodes to the canvas. For more information, see Manage workflows. Then, you can call the DescribeFlow operation to obtain the graph information from the Graph parameter in the returned information.
      Create a workflow.
      #! /usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.CreateFlowForWebRequest import CreateFlowForWebRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = CreateFlowForWebRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_Name("new_flow1")
      request.set_Description("create flow by openAPI")
      request.set_CreateCluster(False)
      request.set_ClusterId("C-B503DDB15B34****")
      request.set_Graph(" {\"nodes\":[{\"shape\":\"startControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"spmAnchorId\":\"0.0.0.i0.766645eb2cmNtQ\",\"x\":500.0,\"y\":250.0,\"id\":\"48d474ea\",\"index\":0.0,\"attribute\":{\"type\":\"START\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":498.0,\"y\":324.5,\"id\":\"cd5eb72d\",\"index\":1.0,\"label\":\"customer_logs\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-79B3F70E23D6****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":497.0,\"y\":416.5,\"id\":\"b308995d\",\"index\":2.0,\"label\":\"log_aggregation;\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-CECB36039155****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":501.0,\"y\":516.5,\"id\":\"35c8d9c5\",\"index\":3.0,\"label\":\"customer_pic\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-99CFAE652650****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"endControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"trackerPath\":\"bb4b171fe9d130b4\",\"x\":503.0,\"y\":612.5,\"id\":\"65f9c9a4\",\"index\":7.0,\"attribute\":{\"type\":\"END\"}}],\"edges\":[{\"source\":\"48d474ea\",\"sourceAnchor\":0.0,\"target\":\"cd5eb72d\",\"targetAnchor\":0.0,\"id\":\"3820959f\",\"index\":4.0},{\"source\":\"cd5eb72d\",\"sourceAnchor\":1.0,\"target\":\"b308995d\",\"targetAnchor\":0.0,\"id\":\"248f9dd5\",\"index\":5.0},{\"source\":\"b308995d\",\"sourceAnchor\":1.0,\"target\":\"35c8d9c5\",\"targetAnchor\":0.0,\"id\":\"dd21ddbf\",\"index\":6.0},{\"source\":\"35c8d9c5\",\"sourceAnchor\":1.0,\"target\":\"65f9c9a4\",\"targetAnchor\":0.0,\"id\":\"7ab0cd5e\",\"index\":8.0}]}");
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
      The ID of the workflow is returned.
      {
          "RequestId": "417BA643-0E5F-456E-8222-B3EB5DD800DF",
          "Id": "F-AC0915556C24****"
      }
    2. Submit the workflow.
      #! /usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.SubmitFlowRequest import SubmitFlowRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = SubmitFlowRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_FlowId("F-AC0915556C24****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
      The ID of the instance on which the workflow runs is returned.
      {
          "Data": "FI-521925BE9816****",
          "InstanceId": "FI-521925BE9816****",
          "RequestId": "DBD1D138-C7F0-4D81-B3F2-0B4512E4A74C",
          "Id": "FI-521925BE9816****"
      }
    3. Query the information of the instance on which the workflow runs. View the running status of the instance and that of each workflow node in the returned information.
      #! /usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.DescribeFlowInstanceRequest import DescribeFlowInstanceRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = DescribeFlowInstanceRequest()
      request.set_accept_format('json')
      
      request.set_Id("FI-521925BE9816****")
      request.set_ProjectId("FP-D18E9976D5A****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
    4. You can call the following operations based on the obtained running status of each workflow node:
      • DescribeFlowNodeInstance: queries the details of the instance on which a workflow node runs.
      • DescribeFlowNodeInstanceLauncherLog: queries the launcher log of the instance on which a workflow node runs.
      • ListFlowNodeInstanceContainerStatus: queries the status of the container of the instance on which a workflow node runs.

Related API operations