E-MapReduce (EMR) Spark nodes allow you to perform complex memory data analysis and help you build large, low-latency data analysis applications. This topic describes how to create an EMR Spark node. This topic also provides examples to show how the features of an EMR Spark node work.
Prerequisites
The preparations for creating a node are complete for EMR and DataWorks. The preparations vary based on the type of your EMR cluster. EMR provides the following types of clusters:- Data lake clusters: For more information about the preparations, see Configure an EMR data lake cluster and Configure DataWorks.
- Hadoop clusters: For more information about the preparations, see Associate an EMR cluster with a DataWorks workspace as an EMR compute engine instance.
Limits
- EMR Hive nodes can be run only on an exclusive resource group for scheduling.
- DataWorks no longer allows you to associate an EMR Hadoop cluster with a DataWorks workspace. However, the EMR Hadoop clusters that are associated with your DataWorks workspace can still be used.
Precautions
If you commit a node by using spark-submit, we recommend that you set deploy-mode to cluster rather than client.
Procedure
- Go to the DataStudio page.
- Log on to the DataWorks console.
- In the left-side navigation pane, click Workspaces.
- In the top navigation bar, select the region where your workspace resides. Find your workspace and click DataStudio in the Actions column.
- Create a workflow. If you have an existing workflow, skip this step.
- Move the pointer over the
icon and select Create Workflow.
- In the Create Workflow dialog box, configure the Workflow Name parameter.
- Click Create.
- Move the pointer over the
- Create an EMR Spark node.
- Create and reference an EMR JAR resource. If you use an EMR data lake cluster, you can perform the following steps to reference the EMR JAR resource:Note If an EMR Spark node depends on a large number of resources, the resources cannot be uploaded by using the DataWorks console. In this case, you can store the resources in Hadoop Distributed File System (HDFS) and then reference the resources in the code of the EMR Spark node.
spark-submit --master yarn --deploy-mode cluster --name SparkPi --driver-memory 4G --driver-cores 1 --num-executors 5 --executor-memory 4G --executor-cores 1 --class org.apache.spark.examples.JavaSparkPi hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
- Configure the parameters on the Advanced Settings tab. The following table describes the advanced parameters that are configured for different types of EMR clusters.
Cluster type Advanced parameter Data lake cluster - "queue": the scheduling queue to which jobs are committed. Default value: default. For information about EMR YARN, see Basic queue configurations.
- "priority": the priority. Default value: 1.
Note- You can also add a custom SparkConf parameter for the EMR Spark node on the Advanced Settings tab, such as
"spark.eventLog.enabled" : false
. When you commit the code of the EMR Spark node, DataWorks adds the custom parameter to the code in the--conf key=value
format. - If you want to use a system parameter of Spark, such as
--executor-memory 2G
, in an EMR Spark node, you need to add the parameter to the code for the EMR Spark node. - You can use Spark nodes on YARN to submit jobs only if your nodes are in cluster or local mode. Spark 2.x in cluster mode supports metadata lineage.
Hadoop cluster - "queue": the scheduling queue to which jobs are committed. Default value: default. For information about EMR YARN queues, see Basic queue configurations.
- "vcores": the number of vCPUs. Default value: 1.
- "memory": the memory that is allocated to the launcher. Unit: MB. Default value: 2048.
- "priority": the priority. Default value: 1.
- "FLOW_SKIP_SQL_ANALYZE": the manner in which SQL statements are executed. Valid values:
true
: Multiple SQL statements are executed at a time.false
: Only one SQL statement is executed at a time.
- USE_GATEWAY: specifies whether a gateway cluster is used to commit jobs on the current node. Valid values:
true
: A gateway cluster is used to commit jobs.false
: No gateway cluster is used to commit jobs. Jobs are automatically committed to the master node.
Note If the EMR cluster to which the node belongs is not associated with a gateway cluster but the USE_GATEWAY parameter is set totrue
, jobs may fail to be committed.
- Configure scheduling properties for the EMR Presto node. If you want the system to periodically run the EMR Presto node, you can click Properties in the right-side navigation pane to configure properties for the node based on your business requirements.
- Configure basic properties for the EMR Presto node. For more information, see Configure basic properties.
- Configure the scheduling cycle, rerun properties, and scheduling dependencies of the EMR Presto node. For more information, see Configure time properties and Configure same-cycle scheduling dependencies. Note Before you commit the EMR Presto node, you must configure the Rerun and Parent Nodes parameters on the Properties tab.
- Configure resource properties for the EMR Presto node. For more information, see Configure the resource property. If the EMR Presto node that you created is an auto triggered node and you want the node to access the Internet or a virtual private cloud (VPC), you must select the resource group for scheduling that is connected to the node. For more information, see Establish a network connection between a resource group and a data source.
- Commit and deploy the MySQL node.
- Click the
icon in the top toolbar to save the node.
- Click the
icon in the top toolbar to commit the node.
- In the Commit Node dialog box, configure the Change description parameter.
- Click OK.
If you use a workspace in standard mode, you must deploy the node in the production environment after you commit the node. On the left side of the top navigation bar, click Deploy. For more information, see Deploy nodes. - Click the
- View the MySQL node.
- Click Operation Center in the upper-right corner of the configuration tab of the MySQL node to go to Operation Center in the production environment.
- View the scheduled MySQL node. For more information, see View and manage auto triggered nodes.
To view more information about the node, click Operation Center in the top navigation bar of the DataStudio page. For more information, see Overview.
Data development example 1: Test whether the EMR Spark node can be successfully run by calculating the Pi value
In this example, the SparkPi program of Spark is used to test whether the EMR Spark node can be successfully run. For more information about the sample project of Spark, see Use the sample project.
- Obtain the storage path of the JAR package spark-examples_2.11-2.4.5.jar of the sample project. Spark components are installed in the /usr/lib/spark-current path. You must log on to the EMR console and log on to the master node of the desired EMR cluster to obtain the complete path /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar. For more information, see Common file paths.
- On the configuration tab of the EMR Spark node, write code for the node. For more information about how to create an EMR Spark node, see Procedure. In this example, the following code is used:
--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
You need to only complete the content that follows spark-submit. When you commit the EMR Spark node, spark-submit automatically appears. The following code shows the syntax of spark-submit and the code that is actually run:# spark-submit [options] --class [MainClass] xxx.jar args spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
- Save and commit the EMR Spark node. For more information, see Save and commit the EMR Spark node.
1097: Pi is roughly 3.1415547141554714
is returned, the EMR Spark node is successfully run. 
Data development example 2: Use Spark to access MaxCompute
This example demonstrates the features of the EMR Spark node by using Spark to obtain the number of rows in a MaxCompute table. For more information, see Use Spark to access MaxCompute.
- Prepare the environment.
- Associate an EMR compute engine instance and a MaxCompute compute engine instance with the workspace. For more information, see Create and manage workspaces.
- Activate OSS and create a bucket. For more information, see Create buckets.
- Create an IntelliJ IDEA project in Scala.
- Prepare test data. Create an ODPS SQL node on the DataStudio page. Then, execute SQL statements to create a MaxCompute table and insert data into the table. In this example, the following statements are executed to set the data type of the first column of the MaxCompute table to BIGINT and insert two data records into the table. For more information about how to create an ODPS SQL node, see Create an ODPS SQL node.
DROP TABLE IF EXISTS emr_spark_read_odpstable ; CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable ( id BIGINT ,name STRING ) ; INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
- Create a Spark Maven project and add Project Object Model (POM) dependencies. For more information, see Preparations. Add POM dependencies. The following code provides an example:
You can refer to the following code in this example. You can modify the code based on your business requirements.<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-maxcompute_2.11</artifactId> <version>1.9.0</version> </dependency>
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <configuration> <recompileMode>incremental</recompileMode> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> </plugins> </build>
- Use Spark to obtain the number of rows in which data is of the BIGINT type in the first column of the MaxCompute table. For more information, see Use Spark to access MaxCompute. The following code provides an example:
Compress the obtained data into a JAR package. In this example, the JAR package emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar is generated./* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.aliyun.emr.example.spark import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.aliyun.odps.OdpsOps import org.apache.spark.{SparkConf, SparkContext} object SparkMaxComputeDemo { def main(args: Array[String]): Unit = { if (args.length < 6) { System.err.println( """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions> | |Arguments: | | accessKeyId Aliyun Access Key ID. | accessKeySecret Aliyun Key Secret. | envType 0 or 1 | 0: Public environment. | 1: Aliyun internal environment, i.e. Aliyun ECS etc. | project Aliyun ODPS project | table Aliyun ODPS table | numPartitions the number of RDD partitions """.stripMargin) System.exit(1) } val accessKeyId = args(0) val accessKeySecret = args(1) val envType = args(2).toInt val project = args(3) val table = args(4) val numPartitions = args(5).toInt val urls = Seq( Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment ) val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)") val sc = new SparkContext(conf) val odpsOps = envType match { case 0 => OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1)) case 1 => OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1)) } val odpsData = odpsOps.readTable(project, table, read, numPartitions) println(s"Count (odpsData): ${odpsData.count()}") } def read(record: Record, schema: TableSchema): Long = { record.getBigint(0) } }
Note Dependencies that are related to MaxCompute are contained in a third-party package. You must compress the third-party package and the obtained data into a JAR package and upload the JAR package to your EMR cluster. - Upload the JAR package.
- Create an EMR Spark node and run the node. In this example, the EMR Spark node emr_spark_odps is created. For more information about how to create an EMR Spark node, see Procedure.On the configuration tab of the emr_spark_odps node, select the desired EMR compute engine instance and write the following code:
You must replace the parameters such as <accessKeyId>, <accessKeySecret>, <envType>, <project>, <table>, and <numPartitions> with the actual information based on your business requirements.--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1
- Save and commit the EMR Spark node. For more information, see Save and commit the EMR Spark node.
