E-MapReduce (EMR) Spark nodes allow you to perform complex memory data analysis and help you build large, low-latency data analysis applications. This topic describes how to create an EMR Spark node. This topic also provides examples to show how the features of an EMR Spark node work.

Prerequisites

The preparations for creating a node are complete for EMR and DataWorks. The preparations vary based on the type of your EMR cluster. EMR provides the following types of clusters:

Limits

  • EMR Hive nodes can be run only on an exclusive resource group for scheduling.
  • DataWorks no longer allows you to associate an EMR Hadoop cluster with a DataWorks workspace. However, the EMR Hadoop clusters that are associated with your DataWorks workspace can still be used.

Precautions

If you commit a node by using spark-submit, we recommend that you set deploy-mode to cluster rather than client.

Procedure

  1. Go to the DataStudio page.
    1. Log on to the DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region where your workspace resides. Find your workspace and click DataStudio in the Actions column.
  2. Create a workflow.
    If you have an existing workflow, skip this step.
    1. Move the pointer over the Create icon and select Create Workflow.
    2. In the Create Workflow dialog box, configure the Workflow Name parameter.
    3. Click Create.
  3. Create an EMR Spark node.
    1. Move the pointer over the Create icon and choose Create Node > EMR > EMR Spark.
      You can also find the desired workflow, right-click the workflow, and then choose Create Node > EMR > EMR Spark.
    2. In the Create Node dialog box, configure the Name, Engine Instance, Node Type, and Path parameters.
      Note The name of the node must be 1 to 128 characters in length and can contain letters, digits, underscores (_), and periods (.).
    3. Click Commit. Then, the configuration tab of the EMR Spark node appears.
  4. Create and reference an EMR JAR resource.
    If you use an EMR data lake cluster, you can perform the following steps to reference the EMR JAR resource:
    Note If an EMR Spark node depends on a large number of resources, the resources cannot be uploaded by using the DataWorks console. In this case, you can store the resources in Hadoop Distributed File System (HDFS) and then reference the resources in the code of the EMR Spark node.
    spark-submit --master yarn
    --deploy-mode cluster
    --name SparkPi
    --driver-memory 4G
    --driver-cores 1
    --num-executors 5
    --executor-memory 4G
    --executor-cores 1
    --class org.apache.spark.examples.JavaSparkPi
    hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
    1. For more information about how to create an EMR JAR resource, see Create and use an EMR JAR resource. The first time you use DataWorks to access OSS, click Authorize to the right of OSS to authorize DataWorks and EMR to access OSS.
    2. Reference the EMR JAR resource.
      1. Open the EMR Spark node. The configuration tab of the node appears.
      2. Find the resource that you want to reference below Resource in the EMR folder, right-click the resource name, and then select Insert Resource Path.
      3. If the clause that is in the ##@resource_reference{""} format appears on the configuration tab of the EMR Shell node, the resource is referenced. Then, run the following code. You must replace the information in the following code with the actual information. The information includes the resource package name, bucket name, and directory.
        ##@resource_reference{"spark-examples_2.11-2.4.0.jar"}
        spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster  spark-examples_2.11-2.4.0.jar 100
      Note You cannot add comments when you write code for an EMR Spark node.
  5. Configure the parameters on the Advanced Settings tab.
    The following table describes the advanced parameters that are configured for different types of EMR clusters.
    Cluster typeAdvanced parameter
    Data lake cluster
    • "queue": the scheduling queue to which jobs are committed. Default value: default. For information about EMR YARN, see Basic queue configurations.
    • "priority": the priority. Default value: 1.
    Note
    • You can also add a custom SparkConf parameter for the EMR Spark node on the Advanced Settings tab, such as "spark.eventLog.enabled" : false. When you commit the code of the EMR Spark node, DataWorks adds the custom parameter to the code in the --conf key=value format.
    • If you want to use a system parameter of Spark, such as --executor-memory 2G, in an EMR Spark node, you need to add the parameter to the code for the EMR Spark node.
    • You can use Spark nodes on YARN to submit jobs only if your nodes are in cluster or local mode. Spark 2.x in cluster mode supports metadata lineage.
    Hadoop cluster
    • "queue": the scheduling queue to which jobs are committed. Default value: default. For information about EMR YARN queues, see Basic queue configurations.
    • "vcores": the number of vCPUs. Default value: 1.
    • "memory": the memory that is allocated to the launcher. Unit: MB. Default value: 2048.
    • "priority": the priority. Default value: 1.
    • "FLOW_SKIP_SQL_ANALYZE": the manner in which SQL statements are executed. Valid values:
      • true: Multiple SQL statements are executed at a time.
      • false: Only one SQL statement is executed at a time.
    • USE_GATEWAY: specifies whether a gateway cluster is used to commit jobs on the current node. Valid values:
      • true: A gateway cluster is used to commit jobs.
      • false: No gateway cluster is used to commit jobs. Jobs are automatically committed to the master node.
      Note If the EMR cluster to which the node belongs is not associated with a gateway cluster but the USE_GATEWAY parameter is set to true, jobs may fail to be committed.
  6. Configure scheduling properties for the EMR Presto node.
    If you want the system to periodically run the EMR Presto node, you can click Properties in the right-side navigation pane to configure properties for the node based on your business requirements.
  7. Commit and deploy the MySQL node.
    1. Click the Save icon in the top toolbar to save the node.
    2. Click the Submit icon in the top toolbar to commit the node.
    3. In the Commit Node dialog box, configure the Change description parameter.
    4. Click OK.
    If you use a workspace in standard mode, you must deploy the node in the production environment after you commit the node. On the left side of the top navigation bar, click Deploy. For more information, see Deploy nodes.
  8. View the MySQL node.
    1. Click Operation Center in the upper-right corner of the configuration tab of the MySQL node to go to Operation Center in the production environment.
    2. View the scheduled MySQL node. For more information, see View and manage auto triggered nodes.
    To view more information about the node, click Operation Center in the top navigation bar of the DataStudio page. For more information, see Overview.

Data development example 1: Test whether the EMR Spark node can be successfully run by calculating the Pi value

In this example, the SparkPi program of Spark is used to test whether the EMR Spark node can be successfully run. For more information about the sample project of Spark, see Use the sample project.

  1. Obtain the storage path of the JAR package spark-examples_2.11-2.4.5.jar of the sample project.
    Spark components are installed in the /usr/lib/spark-current path. You must log on to the EMR console and log on to the master node of the desired EMR cluster to obtain the complete path /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar. For more information, see Common file paths. Sample path
  2. On the configuration tab of the EMR Spark node, write code for the node. For more information about how to create an EMR Spark node, see Procedure.
    In this example, the following code is used:
    --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
    You need to only complete the content that follows spark-submit. When you commit the EMR Spark node, spark-submit automatically appears. The following code shows the syntax of spark-submit and the code that is actually run:
    # spark-submit [options] --class [MainClass] xxx.jar args
    spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
  3. Save and commit the EMR Spark node. For more information, see Save and commit the EMR Spark node.
If the result 1097: Pi is roughly 3.1415547141554714 is returned, the EMR Spark node is successfully run. Returned result of example 1

Data development example 2: Use Spark to access MaxCompute

This example demonstrates the features of the EMR Spark node by using Spark to obtain the number of rows in a MaxCompute table. For more information, see Use Spark to access MaxCompute.

Before you perform the operation in this example, you must make the following preparations:
  • Prepare the environment.
    • Associate an EMR compute engine instance and a MaxCompute compute engine instance with the workspace. For more information, see Create and manage workspaces.
    • Activate OSS and create a bucket. For more information, see Create buckets.
    • Create an IntelliJ IDEA project in Scala.
  • Prepare test data.
    Create an ODPS SQL node on the DataStudio page. Then, execute SQL statements to create a MaxCompute table and insert data into the table. In this example, the following statements are executed to set the data type of the first column of the MaxCompute table to BIGINT and insert two data records into the table. For more information about how to create an ODPS SQL node, see Create an ODPS SQL node.
    DROP TABLE IF EXISTS emr_spark_read_odpstable ;
    CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable 
    (
        id BIGINT
        ,name STRING
    )
    ;
    INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
  1. Create a Spark Maven project and add Project Object Model (POM) dependencies. For more information, see Preparations.
    Add POM dependencies. The following code provides an example:
        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-maxcompute_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
    You can refer to the following code in this example. You can modify the code based on your business requirements.
            <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
              
                  <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <configuration>
                        <recompileMode>incremental</recompileMode>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
  2. Use Spark to obtain the number of rows in which data is of the BIGINT type in the first column of the MaxCompute table. For more information, see Use Spark to access MaxCompute.
    The following code provides an example:
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.aliyun.emr.example.spark
    
    import com.aliyun.odps.TableSchema
    import com.aliyun.odps.data.Record
    
    import org.apache.spark.aliyun.odps.OdpsOps
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkMaxComputeDemo {
      def main(args: Array[String]): Unit = {
        if (args.length < 6) {
          System.err.println(
            """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
              |
              |Arguments:
              |
              |    accessKeyId      Aliyun Access Key ID.
              |    accessKeySecret  Aliyun Key Secret.
              |    envType          0 or 1
              |                     0: Public environment.
              |                     1: Aliyun internal environment, i.e. Aliyun ECS etc.
              |    project          Aliyun ODPS project
              |    table            Aliyun ODPS table
              |    numPartitions    the number of RDD partitions
            """.stripMargin)
          System.exit(1)
        }
    
        val accessKeyId = args(0)
        val accessKeySecret = args(1)
        val envType = args(2).toInt
        val project = args(3)
        val table = args(4)
        val numPartitions = args(5).toInt
    
        val urls = Seq(
          Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment
          Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment
        )
    
        val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
        val sc = new SparkContext(conf)
        val odpsOps = envType match {
          case 0 =>
            OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
          case 1 =>
            OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
        }
    
        val odpsData = odpsOps.readTable(project, table, read, numPartitions)
    
        println(s"Count (odpsData): ${odpsData.count()}")
      }
    
      def read(record: Record, schema: TableSchema): Long = {
        record.getBigint(0)
      }
    }
    Compress the obtained data into a JAR package. In this example, the JAR package emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar is generated.
    Note Dependencies that are related to MaxCompute are contained in a third-party package. You must compress the third-party package and the obtained data into a JAR package and upload the JAR package to your EMR cluster.
  3. Upload the JAR package.
    1. Log on to the OSS console.
    2. Upload the JAR package to the specified path.
      In this example, the JAR package emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar is uploaded to the oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/ path. The first time you use OSS to access DataWorks, you must grant access permissions to OSS. For more information, see Create and use an EMR MR node.
      Note The upper limit for the size of EMR resources in the DataWorks console is 50 MB. In most cases, the size of a JAR package that is used to add dependencies is greater than 50 MB. Therefore, you must upload the JAR package in the OSS console. If the size of your JAR package is less than 50 MB, you can upload the package in the DataWorks console. For more information, see Create and use an EMR JAR resource.
      Upload the JAR package
  4. Create an EMR Spark node and run the node.
    In this example, the EMR Spark node emr_spark_odps is created. For more information about how to create an EMR Spark node, see Procedure.
    On the configuration tab of the emr_spark_odps node, select the desired EMR compute engine instance and write the following code:
    --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1
    You must replace the parameters such as <accessKeyId>, <accessKeySecret>, <envType>, <project>, <table>, and <numPartitions> with the actual information based on your business requirements.
  5. Save and commit the EMR Spark node. For more information, see Save and commit the EMR Spark node.
You can view the returned result from the run logs of the EMR Spark node. If the number of data records in the MaxCompute table is 2 in the returned result, the EMR Spark node is successfully run. Returned result