您可以通过创建EMR(E-MapReduce) MR节点,将一个大规模数据集拆分为多个Map任务并行处理,实现大规模数据集的并行运算。本文为您介绍如何创建EMR MR节点,并以使用MR节点实现从OSS中读取文本,统计其中单词的数量为例,为您展示EMR MR节点的作业开发流程。

前提条件

  • 您已创建阿里云EMR集群,且集群所在的安全组中入方向的安全策略包含以下策略。
    • 授权策略:允许
    • 协议类型:自定义 TCP
    • 端口范围:8898/8898
    • 授权对象:100.104.0.0/16
  • 您在工作空间配置页面添加E-MapReduce计算引擎实例后,当前页面才会显示EMR目录。详情请参见配置工作空间
  • 如果EMR启用了Ranger,则使用DataWorks进行EMR的作业开发前,您需要在EMR中修改配置,添加白名单配置并重启Hive,否则作业运行时会报错Cannot modify spark.yarn.queue at runtimeCannot modify SKYNET_BIZDATE at runtime
    1. 白名单的配置通过EMR的自定义参数,添加Key和Value进行配置,以Hive组件的配置为例,配置值如下。
      hive.security.authorization.sqlstd.confwhitelist.append=tez.*|spark.*|mapred.*|mapreduce.*|ALISA.*|SKYNET.*
      说明 其中ALISA.*SKYNET.*为DataWorks专有的配置。
    2. 白名单配置完成后需要重启服务,重启后配置才会生效。重启服务的操作详情请参见重启服务
  • 已开通独享调度资源组,并且独享调度资源组需要绑定EMR所在的VPC专有网络,详情请参见新增和使用独享调度资源组
    说明 仅支持使用独享调度资源组运行该类型任务。
  • 使用EMR MR节点进行作业开发时,如果需要引用开源代码资源,您需先将开源代码作为资源上传至EMR JAR资源节点中,详情请参见创建和使用EMR JAR资源
  • 使用EMR MR节点进行作业开发时,如果需要引用自定义函数时,您需要先将自定义函数作为资源上传至EMR JAR资源节点中,新建注册此函数,详情请参见注册EMR函数
  • 如果您使用本文的作业开发示例执行相关作业流程,则还需要完成如下操作:
    • 已创建OSS的存储空间Bucket,详情请参见创建存储空间
    • 已创建好IDEA项目。

背景信息

本文以使用MR节点实现从OSS中读取文本,统计其中单词的数量为例,为您展示EMR MR节点的作业开发流程。涉及的文件名称、Bucket名称及路径等信息,在实际使用中,您需要替换为实际使用的相关信息。

准备初始数据及JAR资源包

  1. 准备初始数据。
    创建input01.txt文件,文件内容如下。
    hadoop emr hadoop dw
    hive hadoop
    dw emr
  2. 创建初始数据及JAR资源的存放目录。
    1. 登录OSS管理控制台
    2. 单击左侧导航栏的Bucket列表
    3. 单击目标Bucket名称,进入文件管理页面。
      本文示例使用的Bucket为onaliyun-bucket-2
    4. 单击新建目录,创建初始数据及JAR资源的存放目录。
      • 配置目录名emr/datas/wordcount02/inputs,创建初始数据的存放目录。
      • 配置目录名emr/jars,创建JAR资源的存放目录。
    5. 上传初始数据文件至初始数据的存放目录。
      1. 进入/emr/datas/wordcount02/inputs路径。
      2. 单击上传文件
      3. 待上传文件区域单击扫描文件,添加input01.txt文件至Bucket。上传文件
      4. 单击上传文件
  3. 使用MapReduce读取OSS文件并生成JAR包。
    1. 打开已创建的IDEA项目,添加pom依赖。
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-mapreduce-client-common</artifactId>
                  <version>2.8.5</version> <!--因为EMR-MR用的是2.8.5-->
              </dependency>
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-common</artifactId>
                  <version>2.8.5</version>
              </dependency>
    2. 在MapReduce中读写OSS文件,需要配置如下参数。
      conf.set("fs.oss.accessKeyId", "${accessKeyId}");
      conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
      conf.set("fs.oss.endpoint","${endpoint}");
      参数说明如下:
      • ${accessKeyId}:阿里云账号的AccessKey ID。
      • ${accessKeySecret}:阿里云账号的AccessKey Secret。
      • ${endpoint}:OSS对外服务的访问域名。由您集群所在的地域决定,对应的OSS也需要是在集群对应的地域,详情请参见访问域名和数据中心
      以Java代码为例,修改Hadoop官网WordCount示例,即在代码中添加AccessKey ID和AccessKey Secret的配置,以便作业有权限访问OSS文件。
      package cn.apache.hadoop.onaliyun.examples;
      
      import java.io.IOException;
      import java.util.StringTokenizer;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
      
      public class EmrWordCount {
          public static class TokenizerMapper
                  extends Mapper<Object, Text, Text, IntWritable> {
              private final static IntWritable one = new IntWritable(1);
              private Text word = new Text();
      
              public void map(Object key, Text value, Context context
              ) throws IOException, InterruptedException {
                  StringTokenizer itr = new StringTokenizer(value.toString());
                  while (itr.hasMoreTokens()) {
                      word.set(itr.nextToken());
                      context.write(word, one);
                  }
              }
          }
      
          public static class IntSumReducer
                  extends Reducer<Text, IntWritable, Text, IntWritable> {
              private IntWritable result = new IntWritable();
      
              public void reduce(Text key, Iterable<IntWritable> values,
                                 Context context
              ) throws IOException, InterruptedException {
                  int sum = 0;
                  for (IntWritable val : values) {
                      sum += val.get();
                  }
                  result.set(sum);
                  context.write(key, result);
              }
          }
      
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration();
              String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
              if (otherArgs.length < 2) {
                  System.err.println("Usage: wordcount <in> [<in>...] <out>");
                  System.exit(2);
              }
              conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
              conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
              conf.set("fs.oss.endpoint", "${endpoint}"); //
              Job job = Job.getInstance(conf, "word count");
              job.setJarByClass(EmrWordCount.class);
              job.setMapperClass(TokenizerMapper.class);
              job.setCombinerClass(IntSumReducer.class);
              job.setReducerClass(IntSumReducer.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              for (int i = 0; i < otherArgs.length - 1; ++i) {
                  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
              }
              FileOutputFormat.setOutputPath(job,
                      new Path(otherArgs[otherArgs.length - 1]));
              System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
      }
                                      
    3. 编辑完上述Java代码后将该代码生成JAR包。示例生成的JAR包为onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.

创建并使用EMR MR节点

本文以使用MR节点实现从OSS中读取文本,统计其中单词的数量为例,为您展示EMR MR节点的作业开发流程。

  1. 进入数据开发页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 选择工作空间所在地域后,单击相应工作空间后的进入数据开发
  2. 鼠标悬停至新建图标,单击EMR > EMR MR
    您也可以找到相应的业务流程,右键单击EMR,选择新建 > EMR MR
  3. 新建节点对话框中,输入节点名称,并选择目标文件夹
    说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
  4. 单击提交
  5. 创建EMR JAR资源。
    创建EMR JAR资源,详情请参见创建和使用EMR JAR资源。示例将上述步骤生成的JAR包存储在JAR资源的存放目录emr/jars下。首次使用需要进行一键授权新建JAR资源
  6. 引用EMR JAR资源。
    1. 打开创建的EMR MR节点,停留在代码编辑页面。
    2. EMR > 资源节点下,找到待引用资源(示例为onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.),右键选择引用资源
      引用资源
    3. 选择引用后,当EMR MR节点的代码编辑页面出现如下引用成功提示时,表明已成功引用代码资源。此时,需要执行下述命令。
      如下命令涉及的资源包、Bucket名称、路径信息等为本文示例的内容,使用时,您需要替换为实际使用的信息。
      ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
      onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      资源引用成功
  7. 保存并提交节点。
    注意 您需要设置节点的重跑属性依赖的上游节点,才可以提交节点。
    1. 单击工具栏中的保存图标,保存节点。
    2. 单击工具栏中的提交图标。
    3. 提交新版本对话框中,输入变更描述
    4. 单击确认
    如果您使用的是标准模式的工作空间,提交成功后,请单击右上方的发布。具体操作请参见发布任务

查看结果

  • 登录OSS管理控制台,您可以在目标Bucket的初始数据存放目录下查看写入结果。示例路径为emr/datas/wordcount02/inputs目标Bucket
  • 在DataWorks读取统计结果。
    1. 新建EMR Hive节点,详情请参见创建EMR Hive节点
    2. 在EMR Hive节点中创建挂载在OSS上的Hive外表,读取表数据。代码示例如下。
      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT '单词',
          `cout` STRING COMMENT '计数'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;
      运行结果如下图。运行结果