您可以通过创建EMR(E-MapReduce) MR节点,将一个大规模数据集拆分为多个Map任务并行处理,实现大规模数据集的并行运算。本文为您介绍如何创建EMR MR节点,并以使用MR节点实现从OSS中读取文本,统计其中单词的数量为例,为您展示EMR MR节点的作业开发流程。
前提条件
- EMR引擎类型包括新版数据湖(DataLake)及数据湖(Hadoop),不同类型引擎创建节点前需执行的准备工作不同。您需要根据实际情况完成EMR侧及DataWorks侧的准备工作,详情请参见DataLake集群配置、DataWorks配置。
- 使用EMR MR节点进行作业开发时,如果需要引用开源代码资源,您需先将开源代码作为资源上传至EMR JAR资源节点中,详情请参见创建和使用EMR资源。
- 使用EMR MR节点进行作业开发时,如果需要引用自定义函数时,您需要先将自定义函数作为资源上传至EMR JAR资源节点中,新建注册此函数,详情请参见创建EMR函数。
- 如果您使用本文的作业开发示例执行相关作业流程,则还需要创建好OSS的存储空间Bucket。创建OSS的存储空间Bucket,详情请参见控制台创建存储空间。
背景信息
本文以使用MR节点实现从OSS中读取文本,统计其中单词的数量为例,为您展示EMR MR节点的作业开发流程。涉及的文件名称、Bucket名称及路径等信息,在实际使用中,您需要替换为实际使用的相关信息。
使用限制
- 仅支持使用独享调度资源组运行该类型任务。
- DataWorks目前已不支持新绑定Hadoop类型的集群,但您之前已经绑定的Hadoop集群仍然可以继续使用。
准备初始数据及JAR资源包
- 准备初始数据。
创建
input01.txt文件,文件内容如下。
hadoop emr hadoop dw
hive hadoop
dw emr
- 创建初始数据及JAR资源的存放目录。
- 登录OSS管理控制台。
- 单击左侧导航栏的Bucket列表
- 单击目标Bucket名称,进入文件管理页面。
本文示例使用的Bucket为onaliyun-bucket-2。
- 单击新建目录,创建初始数据及JAR资源的存放目录。
- 配置目录名为emr/datas/wordcount02/inputs,创建初始数据的存放目录。
- 配置目录名为emr/jars,创建JAR资源的存放目录。
- 上传初始数据文件至初始数据的存放目录。
- 进入/emr/datas/wordcount02/inputs路径。
- 单击上传文件
- 在待上传文件区域单击扫描文件,添加input01.txt文件至Bucket。

- 单击上传文件
- 使用MapReduce读取OSS文件并生成JAR包。
- 打开已创建的IDEA项目,添加pom依赖。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.8.5</version> <!--因为EMR-MR用的是2.8.5-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
- 在MapReduce中读写OSS文件,需要配置如下参数。
重要 风险提示: 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。强烈建议不要将AccessKey ID和AccessKey Secret保存到工程代码里或者任何容易被泄露的地方,AccessKey泄露会威胁您账号下所有资源的安全。以下代码示例仅供参考,请妥善保管好您的AccessKey信息。
conf.set("fs.oss.accessKeyId", "${accessKeyId}");
conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
conf.set("fs.oss.endpoint","${endpoint}");
参数说明如下:
${accessKeyId}
:阿里云账号的AccessKey ID。${accessKeySecret}
:阿里云账号的AccessKey Secret。${endpoint}
:OSS对外服务的访问域名。由您集群所在的地域决定,对应的OSS也需要是在集群对应的地域,详情请参见访问域名和数据中心
以Java代码为例,修改Hadoop官网WordCount示例,即在代码中添加AccessKey ID和AccessKey Secret的配置,以便作业有权限访问OSS文件。
package cn.apache.hadoop.onaliyun.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class EmrWordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
conf.set("fs.oss.accessKeyId", "${accessKeyId}"); //
conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); //
conf.set("fs.oss.endpoint", "${endpoint}"); //
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(EmrWordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
- 编辑完上述Java代码后将该代码生成JAR包。示例生成的JAR包为onaliyun_mr_wordcount-1.0-SNAPSHOT.jar。
创建并使用EMR MR节点
本文以使用MR节点实现从OSS中读取文本,统计其中单词的数量为例,为您展示EMR MR节点的作业开发流程。
- 进入数据开发页面。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 选择工作空间所在地域后,单击相应工作空间后的数据开发。
- 创建业务流程。
如果您已有业务流程,则可以忽略该步骤。
- 鼠标悬停至
图标,选择新建业务流程。 - 在新建业务流程对话框,输入业务名称。
- 单击新建。
- 创建EMR MR节点。
- 鼠标悬停至
图标,选择。您也可以找到相应的业务流程,右键单击业务流程,选择。
- 在新建节点对话框中,输入名称,并选择引擎实例、节点类型及路径。
说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
- 单击提交,进入EMR MR节点编辑页面。
- 创建并引用EMR JAR资源。
说明 若EMR MR节点依赖的资源较大,则无法通过DataWorks页面上传。您可将资源存放至HDFS上,然后在代码中进行引用。代码示例如下。
spark-submit --master yarn
--deploy-mode cluster
--name SparkPi
--driver-memory 4G
--driver-cores 1
--num-executors 5
--executor-memory 4G
--executor-cores 1
--class org.apache.spark.examples.JavaSparkPi
hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
- 创建EMR JAR资源。
创建EMR JAR资源,详情请参见
创建和使用EMR资源。示例将本文
《准备初始数据及JAR资源包》种生成的JAR包存储在JAR资源的存放目录
emr/jars下。首次使用需要进行
一键授权。

- 引用EMR JAR资源。
- 打开创建的EMR MR节点,停留在代码编辑页面。
- 在节点下,找到待引用资源(示例为onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.),右键选择引用资源。

- 选择引用后,当EMR MR节点的代码编辑页面出现如下引用成功提示时,表明已成功引用代码资源。此时,需要执行下述命令。如下命令涉及的资源包、Bucket名称、路径信息等为本文示例的内容,使用时,您需要替换为实际使用的信息。
##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs

- 编辑高级设置。
不同类型EMR集群涉及配置的高级参数有差异,具体如下表。
集群类型 | 高级参数 |
---|
新版数据湖(DataLake) | - “queue”:提交作业的调度队列,默认为default队列。关于EMR YARN说明,详情请参见队列基础配置。
- “priority”:优先级,默认为1。
说明 您也可以直接在高级配置里追加自定义MR任务参数。提交代码时DataWorks会自动在命令中通过-D key=value 语句加上新增的参数。 |
数据湖(Hadoop) | - “queue”:提交作业的调度队列,默认为default队列。关于EMR YARN说明,详情请参见队列基础配置。
- “vcores”: 虚拟核数,默认为1。
- “memory”:内存,默认为2048MB(用于设置启动器Launcher的内存配额)。
- “priority”:优先级,默认为1。
- “FLOW_SKIP_SQL_ANALYZE”:SQL语句执行方式。取值如下:
true :表示每次执行多条SQL语句。false :表示每次执行一条SQL语句。
- “USE_GATEWAY”:设置本节点提交作业时,是否通过Gateway集群提交。取值如下:
true :通过Gateway集群提交。false :不通过Gateway集群提交,默认提交到header节点。
说明 如果本节点所在的集群未关联Gateway集群,此处手动设置参数取值为true 时,后续提交EMR作业时会失败。
|
- 任务调度配置。
如果您需要周期性执行创建的节点任务,可以单击节点编辑页面右侧的
调度配置,根据业务需求配置该节点任务的调度信息:
- 提交并发布节点任务。
- 单击工具栏中的
图标,保存节点。 - 单击工具栏中的
图标,提交节点任务。 - 在提交新版本对话框中,输入变更描述。
- 单击确定。
如果您使用的是标准模式的工作空间,任务提交成功后,需要将任务发布至生产环境进行发布。请单击顶部菜单栏左侧的
任务发布。具体操作请参见
发布任务。
- 查看周期调度任务。
- 单击编辑界面右上角的运维,进入生产环境运维中心。
- 查看运行的周期调度任务,详情请参见查看并管理周期任务。
如果您需要查看更多周期调度任务详情,可单击顶部菜单栏的
运维中心,详情请参见
运维中心概述。
查看结果
- 登录OSS管理控制台,您可以在目标Bucket的初始数据存放目录下查看写入结果。示例路径为emr/datas/wordcount02/inputs。

- 在DataWorks读取统计结果。
- 新建EMR Hive节点,详情请参见创建EMR Hive节点。
- 在EMR Hive节点中创建挂载在OSS上的Hive外表,读取表数据。代码示例如下。
CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
(
`word` STRING COMMENT '单词',
`cout` STRING COMMENT '计数'
)
ROW FORMAT delimited fields terminated by '\t'
location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
SELECT * FROM wordcount02_result_tb;
运行结果如下图。