E-MapReduce (EMR) Spark Streaming nodes let you process streaming data with high throughput and fault tolerance. This type of node supports fault tolerance and can help you restore data streams on which errors occur.
Prerequisites
Before you begin, ensure that you have:
An Alibaba Cloud EMR cluster created and registered to DataWorks. For more information, see DataStudio (old version): Associate an EMR computing resource
(Required for RAM users) The RAM user added to the DataWorks workspace as a member with the Develop or Workspace Administrator role. The Workspace Administrator role has more permissions than necessary — assign it with caution. For more information, see Add workspace members and assign roles to them
A serverless resource group purchased and configured with workspace association and network settings. For more information, see Create and use a serverless resource group
A workflow created in DataStudio. For more information, see Create a workflow
Limitations
EMR Spark Streaming nodes run only on a serverless resource group or an exclusive resource group for scheduling. A serverless resource group is recommended.
EMR Spark Streaming nodes created in DataWorks cannot process data in Spark clusters created on the EMR on ACK page.
Step 1: Create an EMR Spark Streaming node
Go to the DataStudio page. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and O\&M > Data Development. Select your workspace from the drop-down list and click Go to Data Development.
Create an EMR Spark Streaming node. The configuration tab for the EMR Spark Streaming node opens.
In the workflow list, right-click a workflow name and choose Create Node > EMR > EMR Spark Streaming. > Note: Alternatively, move the pointer over the Create icon and choose Create Node > EMR > EMR Spark Streaming.
In the Create Node dialog box, set the Name, Engine Instance, Node Type, and Path parameters, then click Confirm. > Note: Node names can contain only letters, digits, underscores (
_), and periods (.).
Step 2: Develop an EMR Spark Streaming task
Reference an EMR JAR resource (DataLake clusters)
If your EMR Spark Streaming node uses a DataLake cluster, follow these steps to reference an EMR JAR resource.
Before referencing, create the EMR JAR resource. The first time you use an EMR JAR resource, click Authorize to grant DataWorks access.
If the resources are too large to upload through the DataWorks console, store them in Hadoop Distributed File System (HDFS) and reference them directly in the node code. Example:
spark-submit --master yarn
--deploy-mode cluster
--name SparkPi
--driver-memory 4G
--driver-cores 1
--num-executors 5
--executor-memory 4G
--executor-cores 1
--class org.apache.spark.examples.JavaSparkPi
hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100Open the EMR Spark Streaming node to display its configuration tab.
Under Resource in the EMR folder, right-click the resource name and select Insert Resource Path. When the
##@resource_reference{""}clause appears on the configuration tab, the resource is referenced.Write your task code. Replace the placeholder values with actual values for your environment.
##@resource_reference{"examples-1.2.0-shaded.jar"} --master yarn-cluster --executor-cores 2 --executor-memory 2g --driver-memory 1g --num-executors 2 --class com.aliyun.emr.example.spark.streaming.JavaLoghubWordCount examples-1.2.0-shaded.jar <logService-project> <logService-store> <group> <endpoint> <access-key-id> <access-key-secret>
Write the spark-submit code
On the configuration tab, write the spark-submit command for the node. Example:
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 2g --driver-memory 1g --num-executors 2 --class com.aliyun.emr.example.spark.streaming.JavaLoghubWordCount examples-1.2.0-shaded.jar <logService-project> <logService-store> <group> <endpoint> <access-key-id> <access-key-secret>Replace the placeholder values:
| Placeholder | Description | Example |
|---|---|---|
<logService-project> | Log Service project name | my-log-project |
<logService-store> | Log Service Logstore name | my-logstore |
<group> | Consumer group name | my-consumer-group |
<endpoint> | Log Service endpoint | cn-hangzhou.log.aliyuncs.com |
<access-key-id> | AccessKey ID of your Alibaba Cloud account | LTAI5tXxx |
<access-key-secret> | AccessKey secret of your Alibaba Cloud account | — |
To get your AccessKey ID and AccessKey secret: log on to the DataWorks console, move the pointer over your profile picture in the upper-right corner, and select AccessKey Management.
In this example, the examples-1.2.0-shaded.jar JAR package is uploaded in the DataWorks console.EMR Spark Streaming nodes do not support comments in node code.
If multiple EMR compute engines are associated with DataStudio, select the one to use. If only one is associated, no selection is needed.
(Optional) Override Spark parameters
Configure advanced parameters on the Advanced Settings tab to override default Spark behavior on a per-node basis. For the full list of configurable properties, see Spark Configuration.
The following parameters are available for DataLake cluster: EMR on ECS:
DataLake cluster: EMR on ECS
| Parameter | Description | Default |
|---|---|---|
| queue | The YARN scheduling queue for submitted jobs. For more information, see YARN schedulers. | default |
| priority | The job priority. | 1 |
| Others | Any SparkConf key-value pair. DataWorks appends the parameter to the spark-submit command when the node code is committed. Example: "spark.driver.memory" : "2g". To enable Ranger permission control, add spark.hadoop.fs.oss.authorization.method=ranger here when you configure global Spark parameters. For more parameters, see Configure global Spark parameters. | — |
Run the task
In the toolbar, click the
icon. In the Parameters dialog box, select a resource group from the Resource Group Name drop-down list and click Run.- To access a compute resource over the Internet or a virtual private cloud (VPC), use the resource group for scheduling that is connected to that compute resource. For more information, see Network connectivity solutions. - To change the resource group in a later run, click the
(Run with Parameters) icon and select a different resource group in the Parameters dialog box.Click the
icon in the top toolbar to save the code.(Optional) Perform smoke testing on the node in the development environment before or after committing it. For more information, see Perform smoke testing.
Step 3: Configure scheduling properties
To run the task on a schedule, click Properties in the right-side navigation pane and configure the scheduling properties for your business requirements. For more information, see Overview.
Configure the Rerun and Parent Nodes parameters on the Properties tab before committing the task.
Step 4: Deploy the task
Click the
icon in the top toolbar to save the task.Click the
icon to commit the task. In the Submit dialog box, enter a Change description and decide whether to enable code review.Code review ensures task code quality. When code review is enabled, committed code can be deployed only after it passes review. For more information, see Code review.
For workspaces in standard mode: after committing, click Deploy in the upper-right corner of the configuration tab to deploy the task to the production environment. For more information, see Deploy nodes.
What's next
After the task is deployed, it runs on the schedule you configured. Click Operation Center in the upper-right corner of the configuration tab to monitor the scheduling status of the task. For more information, see View and manage auto triggered tasks.