全部產品
Search
文件中心

E-MapReduce:MapReduce開發手冊

更新時間:Jul 01, 2024

本文以EMR-3.27.0叢集為例,通過以下樣本為您介紹如何在E-MapReduce叢集中開發MR作業。

在MapReduce中使用OSS

在MapReduce中讀寫OSS,需要配置如下參數。

說明

請確保在代碼運行環境設定了環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體配置方法,請參見配置方案

String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
conf.set("fs.oss.accessKeyId", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
conf.set("fs.oss.accessKeySecret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
conf.set("fs.oss.endpoint","${endpoint}");

參數說明:

  • accessKeyId:阿里雲帳號的AccessKey ID。

  • accessKeySecret:阿里雲帳號的AccessKey Secret。

  • ${endpoint}:OSS對外服務的訪問網域名稱。

    由您叢集所在的地區決定,對應的OSS也需要是在叢集對應的地區,詳情請參見訪問網域名稱和資料中心

WordCount樣本

本樣本為您介紹MapReduce如何從Master節點的OSS中讀取文本,然後統計其中單詞的數量並將資料寫回到OSS中。

  1. 通過SSH方式登入叢集,詳情請參見登入叢集

  2. 執行以下命令,建立目錄wordcount_classes

    mkdir wordcount_classes
  3. 執行以下命令,建立檔案EmrWordCount.java

    1. 執行以下命令,開啟檔案EmrWordCount.java

      vim EmrWordCount.java
    2. 按下i鍵進入編輯模式。

    3. EmrWordCount.java檔案中添加以下資訊。

      package org.apache.hadoop.examples;
       import java.io.IOException;
       import java.util.StringTokenizer;
       import org.apache.hadoop.conf.Configuration;
       import org.apache.hadoop.fs.Path;
       import org.apache.hadoop.io.IntWritable;
       import org.apache.hadoop.io.Text;
       import org.apache.hadoop.mapreduce.Job;
       import org.apache.hadoop.mapreduce.Mapper;
       import org.apache.hadoop.mapreduce.Reducer;
       import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
       import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
       import org.apache.hadoop.util.GenericOptionsParser;
       public class EmrWordCount {
        public static class TokenizerMapper
              extends Mapper<Object, Text, Text, IntWritable>{
           private final static IntWritable one = new IntWritable(1);
           private Text word = new Text();
           public void map(Object key, Text value, Context context
                           ) throws IOException, InterruptedException {
             StringTokenizer itr = new StringTokenizer(value.toString());
             while (itr.hasMoreTokens()) {
               word.set(itr.nextToken());
               context.write(word, one);
             }
           }
         }
         public static class IntSumReducer
              extends Reducer<Text,IntWritable,Text,IntWritable> {
           private IntWritable result = new IntWritable();
           public void reduce(Text key, Iterable<IntWritable> values,
                              Context context
                              ) throws IOException, InterruptedException {
             int sum = 0;
             for (IntWritable val : values) {
               sum += val.get();
             }
             result.set(sum);
             context.write(key, result);
           }
         }
         public static void main(String[] args) throws Exception {
           Configuration conf = new Configuration();
           String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
           if (otherArgs.length < 2) {
             System.err.println("Usage: wordcount <in> [<in>...] <out>");
             System.exit(2);
           }
           conf.set("fs.oss.accessKeyId", accessKeyId);
           conf.set("fs.oss.accessKeySecret", accessKeySecret);
           conf.set("fs.oss.endpoint","${endpoint}");
           Job job = Job.getInstance(conf, "word count");
           job.setJarByClass(EmrWordCount.class);
           job.setMapperClass(TokenizerMapper.class);
           job.setCombinerClass(IntSumReducer.class);
           job.setReducerClass(IntSumReducer.class);
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(IntWritable.class);
           for (int i = 0; i < otherArgs.length - 1; ++i) {
             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
           }
           FileOutputFormat.setOutputPath(job,
             new Path(otherArgs[otherArgs.length - 1]));
           System.exit(job.waitForCompletion(true) ? 0 : 1);
         }
       }
    4. 按下Esc鍵退出編輯模式,輸入:wq儲存並關閉檔案。

  4. 編譯並打包檔案。

    1. 執行以下命令,編譯器。

      javac -classpath <HADOOP_HOME>/share/hadoop/common/hadoop-common-X.X.X.jar:<HADOOP_HOME>/share/hadoop/mapreduce/hadoop-mapreduce-client-core-X.X.X.jar:<HADOOP_HOME>/share/hadoop/common/lib/commons-cli-1.2.jar -d wordcount_classes EmrWordCount.java
      • HADOOP_HOME:Hadoop的安裝目錄,通常Hadoop的安裝目錄為/usr/lib/hadoop-current

        您也可以通過env |grep hadoop命令擷取安裝目錄。

      • X.X.X:JAR包的具體版本號碼,需要根據實際叢集中Hadoop的版本來修改。

        hadoop-common-X.X.X.jar,您可以在<HADOOP_HOME>/share/hadoop/common/目錄下查看。hadoop-mapreduce-client-core-X.X.X.jar,您可以在<HADOOP_HOME>/share/hadoop/mapreduce/目錄下查看。

    2. 執行以下命令,打包JAR檔案。

      jar cvf wordcount.jar -C wordcount_classes .
      說明

      本樣本中,打包後的wordcount.jar檔案預設儲存在/root目錄下。

  5. 建立作業。

    1. 步驟4wordcount.jar上傳到OSS,詳情請參見控制台上傳檔案

      例如,本樣本中JAR檔案在OSS上的路徑為oss://<yourBucketName>/jars/wordcount.jar

    2. 在E-MapReduce中建立MR作業,詳情請參見Hadoop MapReduce作業配置

      作業內容如下所示:

      ossref://<yourBucketName>/jars/wordcount.jar org.apache.hadoop.examples.EmrWordCount oss://<yourBucketName>/data/WordCount/Input oss://<yourBucketName>/data/WordCount/Output

      代碼中的<yourBucketName>需要替換為您實際的OSS Bucket,oss://<yourBucketName>/data/WordCount/Inputoss://<yourBucketName>/data/WordCount/Output分別為輸入輸出路徑。

    3. 在作業編輯中,單擊運行

      MR作業就會在指定的叢集中運行起來。

Wordcount2樣本

當您的工程規模比較大時,您可以使用類似Maven的專案管理工具來進行管理。本樣本為您介紹如何通過Maven工程來管理MR作業。

  1. 在本地安裝Maven和Java環境。

    本樣本中Maven是3.0版本,Java是1.8版本。

  2. 執行如下命令,產生工程架構。

    例如,您的工程開發根目錄是D:/workspace

    mvn archetype:generate -DgroupId=com.aliyun.emr.hadoop.examples -DartifactId=wordcountv2 -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

    通過以上命令會自動產生一個空的Sample工程位於D:/workspace/wordcountv2(和您指定的artifactId一致),裡麵包含一個簡單的pom.xml和App類(類的包路徑和您指定的groupId一致)。

  3. 加入Hadoop依賴。

    使用IDE開啟Sample工程,編輯pom.xml檔案,當Hadoop是2.8.5版本時,需要添加如下內容。

    <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-common</artifactId>
             <version>2.8.5</version>
    </dependency>
    <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>2.8.5</version>
    </dependency>
  4. 編寫代碼。

    1. com.aliyun.emr.hadoop.examples中和App類平行的位置添加新類EMapReduceOSSUtil

      package com.aliyun.emr.hadoop.examples;
       import org.apache.hadoop.conf.Configuration;
       public class EMapReduceOSSUtil {
           private static String SCHEMA = "oss://";
           private static String EPSEP = ".";
           private static String HTTP_HEADER = "http://";
           /**
            * complete OSS uri
            * convert uri like: oss://bucket/path to oss://bucket.endpoint/path
            * ossref do not need this
            *
            * @param oriUri original OSS uri
            */
           public static String buildOSSCompleteUri(String oriUri, String endpoint) {
               if (endpoint == null) {
                   System.err.println("miss endpoint");
                   return oriUri;
               }
               int index = oriUri.indexOf(SCHEMA);
               if (index == -1 || index != 0) {
                   return oriUri;
               }
               int bucketIndex = index + SCHEMA.length();
               int pathIndex = oriUri.indexOf("/", bucketIndex);
               String bucket = null;
               if (pathIndex == -1) {
                   bucket = oriUri.substring(bucketIndex);
               } else {
                   bucket = oriUri.substring(bucketIndex, pathIndex);
               }
               StringBuilder retUri = new StringBuilder();
               retUri.append(SCHEMA)
                       .append(bucket)
                       .append(EPSEP)
                       .append(stripHttp(endpoint));
               if (pathIndex > 0) {
                   retUri.append(oriUri.substring(pathIndex));
               }
               return retUri.toString();
           }
           public static String buildOSSCompleteUri(String oriUri, Configuration conf) {
               return buildOSSCompleteUri(oriUri, conf.get("fs.oss.endpoint"));
           }
           private static String stripHttp(String endpoint) {
               if (endpoint.startsWith(HTTP_HEADER)) {
                   return endpoint.substring(HTTP_HEADER.length());
               }
               return endpoint;
           }
       }
    2. com.aliyun.emr.hadoop.examples中和App類平行的位置添加新類WordCount2.java

      package com.aliyun.emr.hadoop.examples;
       import java.io.BufferedReader;
       import java.io.FileReader;
       import java.io.IOException;
       import java.net.URI;
       import java.util.ArrayList;
       import java.util.HashSet;
       import java.util.List;
       import java.util.Set;
       import java.util.StringTokenizer;
       import org.apache.hadoop.conf.Configuration;
       import org.apache.hadoop.fs.Path;
       import org.apache.hadoop.io.IntWritable;
       import org.apache.hadoop.io.Text;
       import org.apache.hadoop.mapreduce.Job;
       import org.apache.hadoop.mapreduce.Mapper;
       import org.apache.hadoop.mapreduce.Reducer;
       import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
       import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
       import org.apache.hadoop.mapreduce.Counter;
       import org.apache.hadoop.util.GenericOptionsParser;
       import org.apache.hadoop.util.StringUtils;
       public class WordCount2 {
           public static class TokenizerMapper
                   extends Mapper<Object, Text, Text, IntWritable>{
               static enum CountersEnum { INPUT_WORDS }
               private final static IntWritable one = new IntWritable(1);
               private Text word = new Text();
               private boolean caseSensitive;
               private Set<String> patternsToSkip = new HashSet<String>();
               private Configuration conf;
               private BufferedReader fis;
               @Override
               public void setup(Context context) throws IOException,
                       InterruptedException {
                   conf = context.getConfiguration();
                   caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
                   if (conf.getBoolean("wordcount.skip.patterns", true)) {
                       URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
                       for (URI patternsURI : patternsURIs) {
                           Path patternsPath = new Path(patternsURI.getPath());
                           String patternsFileName = patternsPath.getName().toString();
                           parseSkipFile(patternsFileName);
                       }
                   }
               }
               private void parseSkipFile(String fileName) {
                   try {
                       fis = new BufferedReader(new FileReader(fileName));
                       String pattern = null;
                       while ((pattern = fis.readLine()) != null) {
                           patternsToSkip.add(pattern);
                       }
                   } catch (IOException ioe) {
                       System.err.println("Caught exception while parsing the cached file '"
                               + StringUtils.stringifyException(ioe));
                   }
               }
               @Override
               public void map(Object key, Text value, Context context
               ) throws IOException, InterruptedException {
                   String line = (caseSensitive) ?
                           value.toString() : value.toString().toLowerCase();
                   for (String pattern : patternsToSkip) {
                       line = line.replaceAll(pattern, "");
                   }
                   StringTokenizer itr = new StringTokenizer(line);
                   while (itr.hasMoreTokens()) {
                       word.set(itr.nextToken());
                       context.write(word, one);
                       Counter counter = context.getCounter(CountersEnum.class.getName(),
                               CountersEnum.INPUT_WORDS.toString());
                       counter.increment(1);
                   }
               }
           }
           public static class IntSumReducer
                   extends Reducer<Text,IntWritable,Text,IntWritable> {
               private IntWritable result = new IntWritable();
               public void reduce(Text key, Iterable<IntWritable> values,
                                  Context context
               ) throws IOException, InterruptedException {
                   int sum = 0;
                   for (IntWritable val : values) {
                       sum += val.get();
                   }
                   result.set(sum);
                   context.write(key, result);
               }
           }
           public static void main(String[] args) throws Exception {
               Configuration conf = new Configuration();
               conf.set("fs.oss.accessKeyId", accessKeyId);
               conf.set("fs.oss.accessKeySecret", accessKeySecret);
               conf.set("fs.oss.endpoint","${endpoint}");
               GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
               String[] remainingArgs = optionParser.getRemainingArgs();
               if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
                   System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
                   System.exit(2);
               }
               Job job = Job.getInstance(conf, "word count");
               job.setJarByClass(WordCount2.class);
               job.setMapperClass(TokenizerMapper.class);
               job.setCombinerClass(IntSumReducer.class);
               job.setReducerClass(IntSumReducer.class);
               job.setOutputKeyClass(Text.class);
               job.setOutputValueClass(IntWritable.class);
               List<String> otherArgs = new ArrayList<String>();
               for (int i=0; i < remainingArgs.length; ++i) {
                   if ("-skip".equals(remainingArgs[i])) {
                       job.addCacheFile(new Path(EMapReduceOSSUtil.buildOSSCompleteUri(remainingArgs[++i], conf)).toUri());
                       job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
                   } else {
                       otherArgs.add(remainingArgs[i]);
                   }
               }
               FileInputFormat.addInputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(0), conf)));
               FileOutputFormat.setOutputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(1), conf)));
               System.exit(job.waitForCompletion(true) ? 0 : 1);
           }
       }
  5. 在工程的目錄下,執行如下命令,編譯並打包檔案。

    mvn clean package -DskipTests

    您可以在工程目錄的target目錄下看到名稱為wordcountv2-1.0-SNAPSHOT.jar的JAR包。

  6. 建立作業。

    1. 步驟5wordcountv2-1.0-SNAPSHOT.jar上傳到OSS,詳情請參見控制台上傳檔案

      例如,本樣本中JAR檔案在OSS上的路徑為oss://<yourBucketName>/jars/wordcountv2-1.0-SNAPSHOT.jar

    2. 下載並上傳以下檔案至您OSS的對應目錄。

      說明

      The_Sorrows_of_Young_Werther.txt為待統計單詞的文字檔,patterns.txt檔案用來處理需要忽略(不計頻次)的單詞。

    3. 在E-MapReduce中建立MR作業,詳情請參見Hadoop MapReduce作業配置

      作業內容如下所示:

      ossref://<yourBucketName>/jars/wordcountv2-1.0-SNAPSHOT.jar com.aliyun.emr.hadoop.examples.WordCount2 -D wordcount.case.sensitive=true oss://<yourBucketName>/jars/The_Sorrows_of_Young_Werther.txt oss://<yourBucketName>/jars/output -skip oss://<yourBucketName>/jars/patterns.txt

      代碼中的<yourBucketName>需要替換為您實際的OSS Bucket,輸出路徑為oss://<yourBucketName>/jars/output

    4. 在作業編輯中,單擊運行

      MR作業就會在指定的叢集中運行起來。