DataWorks的EMR(E-MapReduce) SPARK节点,用于进行复杂的内存分析,构建大型、低延迟的数据分析应用。本文为您介绍如何创建EMR Spark节点,并通过测试计算Pi及Spark对接MaxCompute两个示例,为您介绍EMR Spark节点的功能。

前提条件

EMR引擎类型包括新版数据湖(DataLake)及Hadoop,不同类型引擎创建节点前需执行的准备工作不同。您需要根据实际情况完成EMR侧及DataWorks侧的准备工作。

使用限制

  • 仅支持使用独享调度资源组运行该类型任务。
  • DataWorks目前已不支持新绑定Hadoop类型的集群,但您之前已经绑定的Hadoop集群仍然可以继续使用。

注意事项

spark-submit方式提交的任务,deploy-mode推荐使用cluster模式,不建议使用client模式。

操作步骤

  1. 进入数据开发页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 选择工作空间所在地域后,单击相应工作空间后的数据开发
  2. 创建业务流程
    如果您已有业务流程,则可以忽略该步骤。
    1. 鼠标悬停至新建图标,选择新建业务流程
    2. 新建业务流程对话框,输入业务名称
    3. 单击新建
  3. 创建EMR Spark节点。
    1. 鼠标悬停至新建图标,选择新建节点 > EMR > EMR Spark
      您也可以找到相应的业务流程,右键单击业务流程,选择新建节点 > EMR > EMR Spark
    2. 新建节点对话框中,输入名称,并选择引擎实例节点类型路径
      说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
    3. 单击提交,进入EMR Spark节点编辑页面。
  4. 创建并引用EMR JAR资源。
    如果您使用的是DataLake(新版数据湖)集群,则可通过如下步骤引用EMR JAR资源。
    说明 若EMR Spark节点依赖的资源较大,则无法通过DataWorks页面上传。您可将资源存放至HDFS上,然后在代码中进行引用,代码示例如下。
    spark-submit --master yarn
    --deploy-mode cluster
    --name SparkPi
    --driver-memory 4G
    --driver-cores 1
    --num-executors 5
    --executor-memory 4G
    --executor-cores 1
    --class org.apache.spark.examples.JavaSparkPi
    hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
    1. 创建EMR JAR资源,详情请参见创建和使用EMR资源。首次使用需要进行一键授权
    2. 引用EMR JAR资源。
      1. 打开创建的EMR Spark节点,停留在代码编辑页面。
      2. EMR > 资源节点下,找到待引用资源,右键选择引用资源
      3. 选择资源后,当节点编辑页面显示##@resource_reference{""}格式的语句,表明资源引用成功。此时,需要执行下述命令。命令涉及的资源包、Bucket名称、路径信息等为本文示例的内容,使用时,请替换为实际使用的信息。
        ##@resource_reference{"spark-examples_2.11-2.4.0.jar"}
        spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster  spark-examples_2.11-2.4.0.jar 100
      说明 EMR Spark节点编辑代码时不支持注释语句。
  5. 编辑高级设置
    不同类型EMR集群涉及配置的高级参数有差异,具体如下表。
    集群类型高级参数
    新版数据湖(DataLake)
    • “queue”:提交作业的调度队列,默认为default队列。关于EMR YARN说明,详情请参见队列基础配置
    • “priority”:优先级,默认为1。
    说明
    • 您也可以直接在高级配置里追加自定义SparkConf参数。例如, "spark.eventLog.enabled" : false ,DataWorks会自动在最终下发引擎的代码中进行补全,格式为:--conf key=value
    • 若您需要在Spark节点中使用系统参数,您需要在代码中自行添加,例如,--executor-memory 2G
    • Spark节点仅支持使用Yarn的Cluster和Local模式提交作业。Spark2的Cluster模式支持元数据血缘。
    数据湖(Hadoop)
    • “queue”:提交作业的调度队列,默认为default队列。关于EMR YARN说明,详情请参见队列基础配置
    • “vcores”: 虚拟核数,默认为1。
    • “memory”:内存,默认为2048MB(用于设置启动器Launcher的内存配额)。
    • “priority”:优先级,默认为1。
    • “FLOW_SKIP_SQL_ANALYZE”:SQL语句执行方式。取值如下:
      • true:表示每次执行多条SQL语句。
      • false:表示每次执行一条SQL语句。
    • “USE_GATEWAY”:设置本节点提交作业时,是否通过Gateway集群提交。取值如下:
      • true:通过Gateway集群提交。
      • false:不通过Gateway集群提交,默认提交到header节点。
      说明 如果本节点所在的集群未关联Gateway集群,此处手动设置参数取值为true时,后续提交EMR作业时会失败。
  6. 任务调度配置。
    如果您需要周期性执行创建的节点任务,可以单击节点编辑页面右侧的调度配置,根据业务需求配置该节点任务的调度信息:
  7. 提交并发布节点任务。
    1. 单击工具栏中的保存图标,保存节点。
    2. 单击工具栏中的提交图标,提交节点任务。
    3. 提交新版本对话框中,输入变更描述
    4. 单击确定
    如果您使用的是标准模式的工作空间,任务提交成功后,需要将任务发布至生产环境进行发布。请单击顶部菜单栏左侧的任务发布。具体操作请参见发布任务
  8. 查看周期调度任务。
    1. 单击编辑界面右上角的运维,进入生产环境运维中心。
    2. 查看运行的周期调度任务,详情请参见查看并管理周期任务
    如果您需要查看更多周期调度任务详情,可单击顶部菜单栏的运维中心,详情请参见运维中心概述

数据开发示例一:使用计算Pi测试当前EMR Spark环境是否可用

示例一以Spark自带示例项目计算Pi为例,测试当前EMR Spark环境是否可用。示例详情请参见示例项目使用说明

  1. 获取Spark自带示例的JAR包spark-examples_2.11-2.4.5.jar的存放路径。
    Spark组件安装在/usr/lib/spark-current路径下,您需要登录阿里云E-MapReduce控制台,进入目标EMR集群查询完整的路径/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情请参见常用文档路径示例路径
  2. 在创建的EMR Spark节点编辑页面,输入运行代码。创建EMR Spark节点,详情请参见操作步骤
    示例运行代码如下。
    --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
    您仅需填写spark-submit后面的内容即可,在作业提交时会自动补全spark-submit的内容。实际执行的界面代码如下。
    # spark-submit [options] --class [MainClass] xxx.jar args
    spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
  3. 保存并提交运行节点任务,详情请参见保存并提交节点任务章节内容。
当返回结果为1097: Pi is roughly 3.1415547141554714时,表示运行成功,EMR Spark环境可用。示例一返回结果

数据开发示例二:Spark对接MaxCompute

本示例以Spark对接MaxCompute,实现通过Spark统计MaxCompute表的行数为例,为您介绍EMR Spark节点的功能应用。更多应用场景请参见EMR Spark开发指南

执行本示例前,您需要准备如下相关环境及测试数据:
  • 准备环境。
  • 准备测试数据。
    在DataWorks数据开发页面创建ODPS SQL节点,执行建表语句并插入数据。示例语句如下,设置第一列为BIGINT类型,同时,插入了两条数据记录。创建ODPS SQL节点,详情请参见创建ODPS SQL节点
    DROP TABLE IF EXISTS emr_spark_read_odpstable ;
    CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable 
    (
        id BIGINT
        ,name STRING
    )
    ;
    INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
  1. 在Spark中创建Maven工程,添加pom依赖,详情请参见Spark准备工作
    添加pom依赖,代码如下。
        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-maxcompute_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
    您可以参考如下插件代码,在实际使用中请以实际代码为准。
            <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
              
                  <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <configuration>
                        <recompileMode>incremental</recompileMode>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
  2. 在Spark中统计MaxCompute表第一列BIGINT类型的行数,详情请参见Spark对接MaxCompute
    示例代码如下。
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.aliyun.emr.example.spark
    
    import com.aliyun.odps.TableSchema
    import com.aliyun.odps.data.Record
    
    import org.apache.spark.aliyun.odps.OdpsOps
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkMaxComputeDemo {
      def main(args: Array[String]): Unit = {
        if (args.length < 6) {
          System.err.println(
            """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
              |
              |Arguments:
              |
              |    accessKeyId      Aliyun Access Key ID.
              |    accessKeySecret  Aliyun Key Secret.
              |    envType          0 or 1
              |                     0: Public environment.
              |                     1: Aliyun internal environment, i.e. Aliyun ECS etc.
              |    project          Aliyun ODPS project
              |    table            Aliyun ODPS table
              |    numPartitions    the number of RDD partitions
            """.stripMargin)
          System.exit(1)
        }
    
        val accessKeyId = args(0)
        val accessKeySecret = args(1)
        val envType = args(2).toInt
        val project = args(3)
        val table = args(4)
        val numPartitions = args(5).toInt
    
        val urls = Seq(
          Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment
          Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment
        )
    
        val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
        val sc = new SparkContext(conf)
        val odpsOps = envType match {
          case 0 =>
            OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
          case 1 =>
            OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
        }
    
        val odpsData = odpsOps.readTable(project, table, read, numPartitions)
    
        println(s"Count (odpsData): ${odpsData.count()}")
      }
    
      def read(record: Record, schema: TableSchema): Long = {
        record.getBigint(0)
      }
    }
    统计MaxCompute数据完成后,请将该数据生成JAR包。示例生成的JAR包为emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar
    说明 与ODPS相关的依赖均属于第三方包,您需要将第三方包一并生成JAR包上传至目标EMR集群。
  3. 上传运行资源。
    1. 登录OSS管控台
    2. 上传运行资源(即上一步骤生成的JAR包)至指定OSS路径。
      本示例中,使用的路径为oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/,您需要上传emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar至该路径。首次使用OSS路径时,需要先进行一键授权,详情请参见创建并使用EMR MR节点
      说明 由于DataWorks EMR资源的使用上限为50M,而添加依赖的JAR包通常大于50M,所以您需要在OSS控制台上传。如果您的运行资源小于50M,您也可以选择在DataWorks直接上传,详情请参见创建和使用EMR资源
      上传运行资源
  4. 创建EMR Spark节点,并执行节点任务。
    本示例创建的节点命名为emr_spark_odps。创建EMR Spark节点,详情请参见操作步骤
    emr_spark_odps节点的编辑页面,选择所使用的EMR引擎实例,输入如下代码。
    --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1
    其中<accessKeyId> 、<accessKeySecret>、 <envType>、 <project>、 <table> 、<numPartitions>等参数信息您需要替换为实际使用的相关信息。
  5. 保存并提交运行节点任务,详情请参见保存并提交节点任务章节内容。
您可以查看运行日志,当返回结果中表记录条数为2时,表示统计结果符合预期。返回结果