E-MapReduce (EMR) Spark nodes allow you to perform complex memory data analysis and help you build large, low-latency data analysis applications. This topic describes how to create an EMR Spark node. It also provides two examples to demonstrate the features of an EMR Spark node.

Prerequisites

  • An Alibaba Cloud EMR cluster is created. The inbound rules of the security group to which the cluster belongs include the following rules:
    • Action: Allow
    • Protocol type: Custom TCP
    • Port range: 8898/8898
    • Authorization object: 100.104.0.0/16
  • An EMR compute engine instance is associated with the desired workspace. The EMR folder is displayed only after you associate an EMR compute engine instance with the workspace on the Workspace Management page. For more information, see Configure a workspace.
  • If you integrate Hive with Ranger in EMR, you must modify whitelist configurations and restart Hive before you develop EMR nodes in DataWorks. Otherwise, the error message Cannot modify spark.yarn.queue at runtime or Cannot modify SKYNET_BIZDATE at runtime is returned when you run EMR nodes.
    1. You can modify the whitelist configurations by using custom parameters in EMR. You can append key-value pairs to the value of a custom parameter. In this example, the custom parameter for Hive components is used. The following code provides an example:
      hive.security.authorization.sqlstd.confwhitelist.append=tez.*|spark.*|mapred.*|mapreduce.*|ALISA.*|SKYNET.*
      Note In the code, ALISA.* and SKYNET.* are configurations in DataWorks.
    2. After the whitelist configurations are modified, you must restart the Hive service to make the configurations take effect. For more information, see Restart a service.
  • An exclusive resource group for scheduling is created, and the resource group is associated with the virtual private cloud (VPC) where the EMR cluster resides. For more information, see Create and use an exclusive resource group for scheduling.
    Note You can use only exclusive resource groups for scheduling to run EMR Hive nodes.

Create an EMR Spark node

  1. Go to the DataStudio page.
    1. Log on to the DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region where your workspace resides, find the workspace, and then click Data Analytics in the Actions column.
  2. On the DataStudio page, move the pointer over the Create icon and choose EMR > EMR Spark.
    Alternatively, you can find the required workflow, right-click the workflow name, and then choose Create > EMR > EMR Spark.
  3. In the Create Node dialog box, set the Node Name and Location parameters.
    Note The node name must be 1 to 128 characters in length and can contain letters, digits, underscores (_), and periods (.).
  4. Click Commit.
    Then, the configuration tab of the newly created EMR Spark node appears.
    Note If multiple EMR compute engine instances are associated with the current workspace, you must select an EMR compute engine instance. If only one EMR compute engine instance is associated with the current workspace, you do not need to select one.
    EMR Spark

Save and commit the EMR Spark node

  1. Save and commit the node.
    Notice You must set the Rerun and Parent Nodes parameters before you can commit the node.
    1. Click the Save icon in the toolbar to save the node.
    2. Click the Commit icon in the toolbar.
    3. In the Commit Node dialog box, enter your comments in the Change description field.
    4. Click OK.
    In a workspace in standard mode, you must click Deploy in the upper-right corner after you commit the node. For more information, see Deploy nodes.
  2. Test the node. For more information, see View auto triggered nodes.

Data development example 1: Test whether the EMR Spark node can be successfully run by calculating the Pi value

In this example, the SparkPi program of Spark is used to test whether the EMR Spark node can be successfully run. For more information about the sample project of Spark, see Use the sample project.

  1. Obtain the storage path of the JAR package spark-examples_2.11-2.4.5.jar of the sample project.
    Spark components are installed in the /usr/lib/spark-current path. You must log on to the EMR console and log on to the master node of the desired EMR cluster to obtain the complete path /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar. For more information, see Common file paths. Sample path
  2. On the configuration tab of the EMR Spark node, write code for the node. For more information about how to create an EMR Spark node, see Create an EMR Spark node.
    The following code is used in this example:
    --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
    You need only to complete the content that follows spark-submit. spark-submit automatically appears when you commit the EMR Spark node. The following code shows the syntax of spark-submit and the code that is actually run:
    # spark-submit [options] --class [MainClass] xxx.jar args
    spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
  3. Save and commit the EMR Spark node. For more information, see Save and commit the EMR Spark node.
If the result 1097: Pi is roughly 3.1415547141554714 is returned, the EMR Spark node is successfully run. Returned result of example 1

Data development example 2: Use Spark to access MaxCompute

This example demonstrates the features of the EMR Spark node by using Spark to obtain the number of rows in a MaxCompute table. For more information, see Use Spark to access MaxCompute.

Before you perform the operation in this example, you must make the following preparations:
  • Prepare the environment.
    • Associate an EMR compute engine instance and a MaxCompute compute engine instance with the workspace. For more information, see Configure a workspace.
    • Activate Object Storage Service (OSS) and create a bucket. For more information, see Create buckets.
    • Create an IntelliJ IDEA project in Scala.
  • Prepare test data.
    Create an ODPS SQL node on the DataStudio page, and execute SQL statements to create a MaxCompute table and insert data into the table. In this example, the following statements are executed to set the data type of the first column of the MaxCompute table to BIGINT and insert two data records into the table. For more information about how to create an ODPS SQL node, see Create an ODPS SQL node.
    DROP TABLE IF EXISTS emr_spark_read_odpstable ;
    CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable 
    (
        id BIGINT
        ,name STRING
    )
    ;
    INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
  1. Create a Spark Maven project and add Project Object Model (POM) dependencies. For more information, see Preparations.
    Add POM dependencies. The following code provides an example:
        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-maxcompute_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
    You can refer to the following code in this example. You can change the code based on your business requirements.
            <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
              
                  <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <configuration>
                        <recompileMode>incremental</recompileMode>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
  2. Use Spark to obtain the number of rows in which data is of the BIGINT type in the first column of the MaxCompute table. For more information, see Use Spark to access MaxCompute.
    The following code provides an example:
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.aliyun.emr.example.spark
    
    import com.aliyun.odps.TableSchema
    import com.aliyun.odps.data.Record
    
    import org.apache.spark.aliyun.odps.OdpsOps
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkMaxComputeDemo {
      def main(args: Array[String]): Unit = {
        if (args.length < 6) {
          System.err.println(
            """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
              |
              |Arguments:
              |
              |    accessKeyId      Aliyun Access Key ID.
              |    accessKeySecret  Aliyun Key Secret.
              |    envType          0 or 1
              |                     0: Public environment.
              |                     1: Aliyun internal environment, i.e. Aliyun ECS etc.
              |    project          Aliyun ODPS project
              |    table            Aliyun ODPS table
              |    numPartitions    the number of RDD partitions
            """.stripMargin)
          System.exit(1)
        }
    
        val accessKeyId = args(0)
        val accessKeySecret = args(1)
        val envType = args(2).toInt
        val project = args(3)
        val table = args(4)
        val numPartitions = args(5).toInt
    
        val urls = Seq(
          Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment
          Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment
        )
    
        val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
        val sc = new SparkContext(conf)
        val odpsOps = envType match {
          case 0 =>
            OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
          case 1 =>
            OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
        }
    
        val odpsData = odpsOps.readTable(project, table, read, numPartitions)
    
        println(s"Count (odpsData): ${odpsData.count()}")
      }
    
      def read(record: Record, schema: TableSchema): Long = {
        record.getBigint(0)
      }
    }
    Compress the obtained data into a JAR package. In this example, the JAR package emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar is generated.
    Note Dependencies that are related to MaxCompute are contained in a third-party package. You must compress the third-party package and the obtained data into a JAR package and upload the JAR package to your EMR cluster.
  3. Upload the JAR package.
    1. Log on to the OSS console.
    2. Upload the JAR package to the specified path.
      In this example, the JAR package emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar is uploaded to the oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/ path. If this is the first time you use OSS to access DataWorks, you must grant access permissions to OSS. For more information, see Create and use an EMR MR node.
      Note The upper limit for the size of EMR resources in the DataWorks console is 50 MB. In most cases, the size of a JAR package that is used to add dependencies is greater than 50 MB. Therefore, you must upload the JAR package in the OSS console. If the size of your JAR package is less than 50 MB, you can directly upload the package in the DataWorks console. For more information, see Create and use an EMR JAR resource.
      Upload the JAR package
  4. Create an EMR Spark node and run the node.
    In this example, the EMR Spark node emr_spark_odps is created. For more information about how to create an EMR Spark node, see Create an EMR Spark node.
    On the configuration tab of the emr_spark_odps node, select the desired EMR compute engine instance and write the following code:
    --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1
    You must replace the parameters such as <accessKeyId>, <accessKeySecret>, <envType>, <project>, <table>, and <numPartitions> with the actual information that you need to use.
  5. Save and commit the EMR Spark node. For more information, see Save and commit the EMR Spark node.
You can view the returned result from the operational logs of the EMR Spark node. If the number of data records in the MaxCompute table is 2 in the returned result, the EMR Spark node is successfully run. Returned result