This topic describes how to call the E-MapReduce (EMR) API to create and run a workflow and view the running results and logs.
Prerequisites
- An EMR cluster is created. For more information, see Create a cluster.
- The cluster ID is obtained. For more information, see View the cluster list and cluster details.
- An AccessKey pair is created. For more information, see Obtain an AccessKey pair.
- The required SDK is obtained. To obtain SDK for Java, see Download SDKs. To obtain SDK for Python, see Install SDK.
Scenarios
You have created a project in the China (Hangzhou) region and created three Hive_SQL jobs that have dependencies with each other in the project.
Basic information of the project and jobs:
- The project name is emr_openapi_demo_project. The project ID is FP-D18E9976D5A****.
- Hive_SQL jobs:
- customer_logs: The job ID is FJ-79B3F70E23D6****.
- log_aggregation: The job ID is FJ-CECB36039155****.
- customer_pic: The job ID is FJ-99CFAE652650****.
Examples:
- Java
- In the emr_openapi_demo_project project, create a workflow named new_flow. You can
customize a workflow graph.
Parameter Description CreateCluster
Specifies whether to create a cluster for the workflow based on a cluster template. - true: Create a cluster based on a cluster template. In this case, set ClusterId to the ID of the cluster template.
- false: Run the workflow in an existing cluster. In this case, set ClusterId to the ID of the existing cluster.
graph
The information of the workflow graph. The following code shows the configurations of the graph parameters:{ "nodes":[ { "shape":"startControlNode", "type":"node", "size":"80*34", "x":500.0, "y":250.0, "id":"48d474ea", "index":0.0, "attribute":{"type":"START"} }, { "shape":"hiveSQLJobNode",\ "type":"node", "size":"170*34", "x":498.0, "y":324.5, "id":"cd5eb72d", "index":1.0, "label":"customer_logs", "attribute":{ "type":"JOB", "jobId":"FJ-79B3F70E23D6****", "jobType":"HIVE_SQL" } }, { "shape":"hiveSQLJobNode", "type":"node", "size":"170*34", "trackerPath":"ec5a56bc4a261c22", "x":497.0, "y":416.5, "id":"b308995d", "index":2.0, "label":"log_aggregation;", "attribute":{ "type":"JOB", "jobId":"FJ-CECB36039155****", "jobType":"HIVE_SQL" } }, { "shape":"hiveSQLJobNode", "type":"node", "size":"170*34", "trackerPath":"ec5a56bc4a261c22", "x":501.0, "y":516.5, "id":"35c8d9c5", "index":3.0, "label":"customer_pic", "attribute":{ "type":"JOB", "jobId":"FJ-99CFAE652650****", "jobType":"HIVE_SQL" }, }, { "shape":"endControlNode", "type":"node", "size":"80*34", "x":503.0, "y":612.5, "id":"65f9c9a4", "index":7.0, "attribute":{"type":"END"} } ], "edges": [ { "source":"48d474ea", "sourceAnchor":0.0, "target":"cd5eb72d", "targetAnchor":0.0, "id":"3820959f", "index":4.0 }, { "source":"cd5eb72d", "sourceAnchor":1.0, "target":"b308995d", "targetAnchor":0.0, "id":"248f9dd5", "index":5.0 }, { "source":"b308995d", "sourceAnchor":1.0, "target":"35c8d9c5", "targetAnchor":0.0, "id":"dd21ddbf", "index":6.0 }, { "source":"35c8d9c5", "sourceAnchor":1.0, "target":"65f9c9a4", "targetAnchor":0.0, "id":"7ab0cd5e", "index":8.0 } ] }
Note- The graph code consists of the
nodes
andedges
parts. You can specify the id parameter based on your business requirements. Make sure that the IDs are unique.x
,y
, andsize
specify the position and size of the graph. Each graph must have a START node and an END node. - You can create a workflow in the EMR console by dragging nodes to the canvas. For more information, see Edit a workflow. Then, you can call the DescribeFlow operation to obtain the graph information from the Graph parameter in the returned information.
Create a workflow.import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class CreateFlowForWeb { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); CreateFlowForWebRequest request = new CreateFlowForWebRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setName("new_flow1"); request.setDescription("create flow by openAPI"); request.setCreateCluster(false); request.setClusterId("C-B503DDB15B34****"); request.setGraph("{\"nodes\":[{\"shape\":\"startControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"spmAnchorId\":\"0.0.0.i0.766645eb2cmNtQ\",\"x\":500.0,\"y\":250.0,\"id\":\"48d474ea\",\"index\":0.0,\"attribute\":{\"type\":\"START\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":498.0,\"y\":324.5,\"id\":\"cd5eb72d\",\"index\":1.0,\"label\":\"customer_logs\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-79B3F70E23D6****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":497.0,\"y\":416.5,\"id\":\"b308995d\",\"index\":2.0,\"label\":\"log_aggregation;\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-CECB36039155****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":501.0,\"y\":516.5,\"id\":\"35c8d9c5\",\"index\":3.0,\"label\":\"customer_pic\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-99CFAE652650****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"endControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"trackerPath\":\"bb4b171fe9d130b4\",\"x\":503.0,\"y\":612.5,\"id\":\"65f9c9a4\",\"index\":7.0,\"attribute\":{\"type\":\"END\"}}],\"edges\":[{\"source\":\"48d474ea\",\"sourceAnchor\":0.0,\"target\":\"cd5eb72d\",\"targetAnchor\":0.0,\"id\":\"3820959f\",\"index\":4.0},{\"source\":\"cd5eb72d\",\"sourceAnchor\":1.0,\"target\":\"b308995d\",\"targetAnchor\":0.0,\"id\":\"248f9dd5\",\"index\":5.0},{\"source\":\"b308995d\",\"sourceAnchor\":1.0,\"target\":\"35c8d9c5\",\"targetAnchor\":0.0,\"id\":\"dd21ddbf\",\"index\":6.0},{\"source\":\"35c8d9c5\",\"sourceAnchor\":1.0,\"target\":\"65f9c9a4\",\"targetAnchor\":0.0,\"id\":\"7ab0cd5e\",\"index\":8.0}]}"); try { CreateFlowForWebResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
The ID of the workflow is returned.{ "RequestId": "417BA643-0E5F-456E-8222-B3EB5DD800DF", "Id": "F-AC0915556C24****" }
- Submit the workflow.
import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class SubmitFlow { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); SubmitFlowRequest request = new SubmitFlowRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setFlowId("F-AC0915556C24****"); try { SubmitFlowResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
The ID of the instance on which the workflow runs is returned.{ "Data": "FI-521925BE9816****", "InstanceId": "FI-521925BE9816****", "RequestId": "DBD1D138-C7F0-4D81-B3F2-0B4512E4A74C", "Id": "FI-521925BE9816****" }
- Query the information of the instance on which the workflow runs. View the running
status of the instance and that of each workflow node in the returned information.
import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class DescribeFlowInstance { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); DescribeFlowInstanceRequest request = new DescribeFlowInstanceRequest(); request.setRegionId("cn-hangzhou"); request.setId("FI-521925BE9816****"); request.setProjectId("FP-D18E9976D5A****"); try { DescribeFlowInstanceResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
- You can call the following operations based on the obtained running status of each
workflow node:
- DescribeFlowNodeInstance: queries the details of the instance on which a workflow node runs.
- DescribeFlowNodeInstanceLauncherLog: queries the launcher log of the instance on which a workflow node runs.
- ListFlowNodeInstanceContainerStatus: queries the status of the container of the instance on which a workflow node runs.
- In the emr_openapi_demo_project project, create a workflow named new_flow. You can
customize a workflow graph.
- Python
- In the emr_openapi_demo_project project, create a workflow named new_flow. You can
customize a workflow graph.
Parameter Description CreateCluster
Specifies whether to create a cluster for the workflow based on a cluster template. - true: Create a cluster based on a cluster template. In this case, set ClusterId to the ID of the cluster template. The ID is in the format of C-xxx.
- false: Run the workflow in an existing cluster. In this case, set ClusterId to the ID of the existing cluster. The ID is in the format of C-xxx.
graph
The information of the workflow graph. The following code shows the configurations of the graph parameters:{ "nodes":[ { "shape":"startControlNode", "type":"node", "size":"80*34", "x":500.0, "y":250.0, "id":"48d474ea", "index":0.0, "attribute":{"type":"START"} }, { "shape":"hiveSQLJobNode",\ "type":"node", "size":"170*34", "x":498.0, "y":324.5, "id":"cd5eb72d", "index":1.0, "label":"customer_logs", "attribute":{ "type":"JOB", "jobId":"FJ-79B3F70E23D6****", "jobType":"HIVE_SQL" } }, { "shape":"hiveSQLJobNode", "type":"node", "size":"170*34", "trackerPath":"ec5a56bc4a261c22", "x":497.0, "y":416.5, "id":"b308995d", "index":2.0, "label":"log_aggregation;", "attribute":{ "type":"JOB", "jobId":"FJ-CECB36039155****", "jobType":"HIVE_SQL" } }, { "shape":"hiveSQLJobNode", "type":"node", "size":"170*34", "trackerPath":"ec5a56bc4a261c22", "x":501.0, "y":516.5, "id":"35c8d9c5", "index":3.0, "label":"customer_pic", "attribute":{ "type":"JOB", "jobId":"FJ-99CFAE652650****", "jobType":"HIVE_SQL" }, }, { "shape":"endControlNode", "type":"node", "size":"80*34", "x":503.0, "y":612.5, "id":"65f9c9a4", "index":7.0, "attribute":{"type":"END"} } ], "edges": [ { "source":"48d474ea", "sourceAnchor":0.0, "target":"cd5eb72d", "targetAnchor":0.0, "id":"3820959f", "index":4.0 }, { "source":"cd5eb72d", "sourceAnchor":1.0, "target":"b308995d", "targetAnchor":0.0, "id":"248f9dd5", "index":5.0 }, { "source":"b308995d", "sourceAnchor":1.0, "target":"35c8d9c5", "targetAnchor":0.0, "id":"dd21ddbf", "index":6.0 }, { "source":"35c8d9c5", "sourceAnchor":1.0, "target":"65f9c9a4", "targetAnchor":0.0, "id":"7ab0cd5e", "index":8.0 } ] }
Note- The graph code consists of the
nodes
andedges
parts. You can specify the id parameter based on your business requirements. Make sure that the IDs are unique.x
,y
, andsize
specify the position and size of the graph. Each graph must have a START node and an END node. - You can create a workflow in the EMR console by dragging nodes to the canvas. For more information, see Edit a workflow. Then, you can call the DescribeFlow operation to obtain the graph information from the Graph parameter in the returned information.
Create a workflow.#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.CreateFlowForWebRequest import CreateFlowForWebRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = CreateFlowForWebRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_Name("new_flow1") request.set_Description("create flow by openAPI") request.set_CreateCluster(False) request.set_ClusterId("C-B503DDB15B34****") request.set_Graph(" {\"nodes\":[{\"shape\":\"startControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"spmAnchorId\":\"0.0.0.i0.766645eb2cmNtQ\",\"x\":500.0,\"y\":250.0,\"id\":\"48d474ea\",\"index\":0.0,\"attribute\":{\"type\":\"START\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":498.0,\"y\":324.5,\"id\":\"cd5eb72d\",\"index\":1.0,\"label\":\"customer_logs\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-79B3F70E23D6****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":497.0,\"y\":416.5,\"id\":\"b308995d\",\"index\":2.0,\"label\":\"log_aggregation;\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-CECB36039155****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":501.0,\"y\":516.5,\"id\":\"35c8d9c5\",\"index\":3.0,\"label\":\"customer_pic\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-99CFAE652650****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"endControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"trackerPath\":\"bb4b171fe9d130b4\",\"x\":503.0,\"y\":612.5,\"id\":\"65f9c9a4\",\"index\":7.0,\"attribute\":{\"type\":\"END\"}}],\"edges\":[{\"source\":\"48d474ea\",\"sourceAnchor\":0.0,\"target\":\"cd5eb72d\",\"targetAnchor\":0.0,\"id\":\"3820959f\",\"index\":4.0},{\"source\":\"cd5eb72d\",\"sourceAnchor\":1.0,\"target\":\"b308995d\",\"targetAnchor\":0.0,\"id\":\"248f9dd5\",\"index\":5.0},{\"source\":\"b308995d\",\"sourceAnchor\":1.0,\"target\":\"35c8d9c5\",\"targetAnchor\":0.0,\"id\":\"dd21ddbf\",\"index\":6.0},{\"source\":\"35c8d9c5\",\"sourceAnchor\":1.0,\"target\":\"65f9c9a4\",\"targetAnchor\":0.0,\"id\":\"7ab0cd5e\",\"index\":8.0}]}"); response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
The ID of the workflow is returned.{ "RequestId": "417BA643-0E5F-456E-8222-B3EB5DD800DF", "Id": "F-AC0915556C24****" }
- Submit the workflow.
#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.SubmitFlowRequest import SubmitFlowRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = SubmitFlowRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_FlowId("F-AC0915556C24****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
The ID of the instance on which the workflow runs is returned.{ "Data": "FI-521925BE9816****", "InstanceId": "FI-521925BE9816****", "RequestId": "DBD1D138-C7F0-4D81-B3F2-0B4512E4A74C", "Id": "FI-521925BE9816****" }
- Query the information of the instance on which the workflow runs. View the running
status of the instance and that of each workflow node in the returned information.
#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.DescribeFlowInstanceRequest import DescribeFlowInstanceRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = DescribeFlowInstanceRequest() request.set_accept_format('json') request.set_Id("FI-521925BE9816****") request.set_ProjectId("FP-D18E9976D5A****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
- You can call the following operations based on the obtained running status of each
workflow node:
- DescribeFlowNodeInstance: queries the details of the instance on which a workflow node runs.
- DescribeFlowNodeInstanceLauncherLog: queries the launcher log of the instance on which a workflow node runs.
- ListFlowNodeInstanceContainerStatus: queries the status of the container of the instance on which a workflow node runs.
- In the emr_openapi_demo_project project, create a workflow named new_flow. You can
customize a workflow graph.