DataWorks的EMR(E-MapReduce) SPARK节点,用于进行复杂的内存分析,构建大型、低延迟的数据分析应用。本文为您介绍如何创建EMR Spark节点,并通过测试计算Pi及Spark对接MaxCompute两个示例,为您介绍EMR Spark节点的功能。
前提条件
EMR引擎类型包括新版数据湖(DataLake)及Hadoop,不同类型引擎创建节点前需执行的准备工作不同。您需要根据实际情况完成EMR侧及DataWorks侧的准备工作。- DataLake:详情请参见DataLake集群配置、DataWorks配置。
- Hadoop:Hadoop集群开发前准备工作。
使用限制
- 仅支持使用独享调度资源组运行该类型任务。
- DataWorks目前已不支持新绑定Hadoop类型的集群,但您之前已经绑定的Hadoop集群仍然可以继续使用。
注意事项
spark-submit方式提交的任务,deploy-mode推荐使用cluster模式,不建议使用client模式。
操作步骤
- 进入数据开发页面。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 选择工作空间所在地域后,单击相应工作空间后的数据开发。
- 创建业务流程。如果您已有业务流程,则可以忽略该步骤。
- 鼠标悬停至
图标,选择新建业务流程。
- 在新建业务流程对话框,输入业务名称。
- 单击新建。
- 鼠标悬停至
- 创建EMR Spark节点。
- 创建并引用EMR JAR资源。如果您使用的是DataLake(新版数据湖)集群,则可通过如下步骤引用EMR JAR资源。说明 若EMR Spark节点依赖的资源较大,则无法通过DataWorks页面上传。您可将资源存放至HDFS上,然后在代码中进行引用,代码示例如下。
spark-submit --master yarn --deploy-mode cluster --name SparkPi --driver-memory 4G --driver-cores 1 --num-executors 5 --executor-memory 4G --executor-cores 1 --class org.apache.spark.examples.JavaSparkPi hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
- 编辑高级设置。不同类型EMR集群涉及配置的高级参数有差异,具体如下表。
集群类型 高级参数 新版数据湖(DataLake) - “queue”:提交作业的调度队列,默认为default队列。关于EMR YARN说明,详情请参见队列基础配置。
- “priority”:优先级,默认为1。
说明- 您也可以直接在高级配置里追加自定义SparkConf参数。例如,
"spark.eventLog.enabled" : false
,DataWorks会自动在最终下发引擎的代码中进行补全,格式为:--conf key=value
。 - 若您需要在Spark节点中使用系统参数,您需要在代码中自行添加,例如,
--executor-memory 2G
。 - Spark节点仅支持使用Yarn的Cluster和Local模式提交作业。Spark2的Cluster模式支持元数据血缘。
数据湖(Hadoop) - “queue”:提交作业的调度队列,默认为default队列。关于EMR YARN说明,详情请参见队列基础配置。
- “vcores”: 虚拟核数,默认为1。
- “memory”:内存,默认为2048MB(用于设置启动器Launcher的内存配额)。
- “priority”:优先级,默认为1。
- “FLOW_SKIP_SQL_ANALYZE”:SQL语句执行方式。取值如下:
true
:表示每次执行多条SQL语句。false
:表示每次执行一条SQL语句。
- “USE_GATEWAY”:设置本节点提交作业时,是否通过Gateway集群提交。取值如下:
true
:通过Gateway集群提交。false
:不通过Gateway集群提交,默认提交到header节点。
说明 如果本节点所在的集群未关联Gateway集群,此处手动设置参数取值为true
时,后续提交EMR作业时会失败。
- 任务调度配置。如果您需要周期性执行创建的节点任务,可以单击节点编辑页面右侧的调度配置,根据业务需求配置该节点任务的调度信息:
- 配置任务调度的基本信息,详情请参见配置基础属性。
- 配置时间调度周期、重跑属性和上下游依赖关系,详情请参见时间属性配置说明及配置同周期调度依赖。说明 您需要设置节点的重跑属性和依赖的上游节点,才可以提交节点。
- 配置资源属性,详情请参见配置资源属性。访问公网或VPC网络时,请选择与目标节点网络连通的调度资源组作为周期调度任务使用的资源组。详情请参见配置资源组与网络连通。
- 提交并发布节点任务。
- 单击工具栏中的
图标,保存节点。
- 单击工具栏中的
图标,提交节点任务。
- 在提交新版本对话框中,输入变更描述。
- 单击确定。
如果您使用的是标准模式的工作空间,任务提交成功后,需要将任务发布至生产环境进行发布。请单击顶部菜单栏左侧的任务发布。具体操作请参见发布任务。 - 单击工具栏中的
- 查看周期调度任务。
- 单击编辑界面右上角的运维,进入生产环境运维中心。
- 查看运行的周期调度任务,详情请参见查看并管理周期任务。
如果您需要查看更多周期调度任务详情,可单击顶部菜单栏的运维中心,详情请参见运维中心概述。
数据开发示例一:使用计算Pi测试当前EMR Spark环境是否可用
示例一以Spark自带示例项目计算Pi为例,测试当前EMR Spark环境是否可用。示例详情请参见示例项目使用说明。
- 获取Spark自带示例的JAR包spark-examples_2.11-2.4.5.jar的存放路径。Spark组件安装在/usr/lib/spark-current路径下,您需要登录阿里云E-MapReduce控制台,进入目标EMR集群查询完整的路径/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情请参见《常用文档路径》。
- 在创建的EMR Spark节点编辑页面,输入运行代码。创建EMR Spark节点,详情请参见操作步骤。示例运行代码如下。
--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
您仅需填写spark-submit后面的内容即可,在作业提交时会自动补全spark-submit的内容。实际执行的界面代码如下。# spark-submit [options] --class [MainClass] xxx.jar args spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
- 保存并提交运行节点任务,详情请参见《保存并提交节点任务》章节内容。
当返回结果为
1097: Pi is roughly 3.1415547141554714
时,表示运行成功,EMR Spark环境可用。
数据开发示例二:Spark对接MaxCompute
本示例以Spark对接MaxCompute,实现通过Spark统计MaxCompute表的行数为例,为您介绍EMR Spark节点的功能应用。更多应用场景请参见EMR Spark开发指南。
执行本示例前,您需要准备如下相关环境及测试数据:
- 准备环境。
- 准备测试数据。在DataWorks数据开发页面创建ODPS SQL节点,执行建表语句并插入数据。示例语句如下,设置第一列为BIGINT类型,同时,插入了两条数据记录。创建ODPS SQL节点,详情请参见创建ODPS SQL节点
DROP TABLE IF EXISTS emr_spark_read_odpstable ; CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable ( id BIGINT ,name STRING ) ; INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
- 在Spark中创建Maven工程,添加pom依赖,详情请参见Spark准备工作。添加pom依赖,代码如下。
您可以参考如下插件代码,在实际使用中请以实际代码为准。<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-maxcompute_2.11</artifactId> <version>1.9.0</version> </dependency>
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <configuration> <recompileMode>incremental</recompileMode> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> </plugins> </build>
- 在Spark中统计MaxCompute表第一列BIGINT类型的行数,详情请参见Spark对接MaxCompute。示例代码如下。
统计MaxCompute数据完成后,请将该数据生成JAR包。示例生成的JAR包为emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar。/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.aliyun.emr.example.spark import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.aliyun.odps.OdpsOps import org.apache.spark.{SparkConf, SparkContext} object SparkMaxComputeDemo { def main(args: Array[String]): Unit = { if (args.length < 6) { System.err.println( """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions> | |Arguments: | | accessKeyId Aliyun Access Key ID. | accessKeySecret Aliyun Key Secret. | envType 0 or 1 | 0: Public environment. | 1: Aliyun internal environment, i.e. Aliyun ECS etc. | project Aliyun ODPS project | table Aliyun ODPS table | numPartitions the number of RDD partitions """.stripMargin) System.exit(1) } val accessKeyId = args(0) val accessKeySecret = args(1) val envType = args(2).toInt val project = args(3) val table = args(4) val numPartitions = args(5).toInt val urls = Seq( Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment ) val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)") val sc = new SparkContext(conf) val odpsOps = envType match { case 0 => OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1)) case 1 => OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1)) } val odpsData = odpsOps.readTable(project, table, read, numPartitions) println(s"Count (odpsData): ${odpsData.count()}") } def read(record: Record, schema: TableSchema): Long = { record.getBigint(0) } }
说明 与ODPS相关的依赖均属于第三方包,您需要将第三方包一并生成JAR包上传至目标EMR集群。 - 上传运行资源。
- 创建EMR Spark节点,并执行节点任务。本示例创建的节点命名为emr_spark_odps。创建EMR Spark节点,详情请参见操作步骤。在emr_spark_odps节点的编辑页面,选择所使用的EMR引擎实例,输入如下代码。
其中<accessKeyId> 、<accessKeySecret>、 <envType>、 <project>、 <table> 、<numPartitions>等参数信息您需要替换为实际使用的相关信息。--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1
- 保存并提交运行节点任务,详情请参见《保存并提交节点任务》章节内容。
您可以查看运行日志,当返回结果中表记录条数为2时,表示统计结果符合预期。
