This topic uses an E-MapReduce (EMR) cluster of V3.27.0 as an example to show how to develop a MapReduce job in an EMR cluster.

Process OSS data by using a MapReduce job

To read data from or write data to Object Storage Service (OSS) by using a MapReduce job, you must configure the following parameters:
conf.set("fs.oss.accessKeyId", "${accessKeyId}");
conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
conf.set("fs.oss.endpoint","${endpoint}");
Parameter description:
  • ${accessKeyId}: the AccessKey ID of your Alibaba Cloud account.
  • ${accessKeySecret}: the AccessKey secret of your Alibaba Cloud account.
  • ${endpoint}: the endpoint of OSS.

    Set this parameter to the endpoint that corresponds to the region where your EMR cluster resides. You must make sure that the OSS bucket you want to use resides in the same region as the EMR cluster. For more information about regions and endpoints, see Regions and endpoints.

WordCount example

This example demonstrates how to use a MapReduce job to read text from OSS, count the number of occurrences of each word in the text, and then write the results back to the OSS bucket.

  1. Log on to your cluster in SSH mode. For more information, see Connect to the master node of an EMR cluster in SSH mode.
  2. Run the following command to create a directory named wordcount_classes:
    mkdir wordcount_classes
  3. Create a file named EmrWordCount.java:
    1. Run the following command to create a file named EmrWordCount.java and open the file:
      vim EmrWordCount.java
    2. Press the I key to switch to the edit mode.
    3. Add the following information to the EmrWordCount.java file:
      package org.apache.hadoop.examples;
       import java.io.IOException;
       import java.util.StringTokenizer;
       import org.apache.hadoop.conf.Configuration;
       import org.apache.hadoop.fs.Path;
       import org.apache.hadoop.io.IntWritable;
       import org.apache.hadoop.io.Text;
       import org.apache.hadoop.mapreduce.Job;
       import org.apache.hadoop.mapreduce.Mapper;
       import org.apache.hadoop.mapreduce.Reducer;
       import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
       import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
       import org.apache.hadoop.util.GenericOptionsParser;
       public class EmrWordCount {
        public static class TokenizerMapper
              extends Mapper<Object, Text, Text, IntWritable>{
           private final static IntWritable one = new IntWritable(1);
           private Text word = new Text();
           public void map(Object key, Text value, Context context
                           ) throws IOException, InterruptedException {
             StringTokenizer itr = new StringTokenizer(value.toString());
             while (itr.hasMoreTokens()) {
               word.set(itr.nextToken());
               context.write(word, one);
             }
           }
         }
         public static class IntSumReducer
              extends Reducer<Text,IntWritable,Text,IntWritable> {
           private IntWritable result = new IntWritable();
           public void reduce(Text key, Iterable<IntWritable> values,
                              Context context
                              ) throws IOException, InterruptedException {
             int sum = 0;
             for (IntWritable val : values) {
               sum += val.get();
             }
             result.set(sum);
             context.write(key, result);
           }
         }
         public static void main(String[] args) throws Exception {
           Configuration conf = new Configuration();
           String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
           if (otherArgs.length < 2) {
             System.err.println("Usage: wordcount <in> [<in>...] <out>");
             System.exit(2);
           }
           conf.set("fs.oss.accessKeyId", "${accessKeyId}");
           conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
           conf.set("fs.oss.endpoint","${endpoint}");
           Job job = Job.getInstance(conf, "word count");
           job.setJarByClass(EmrWordCount.class);
           job.setMapperClass(TokenizerMapper.class);
           job.setCombinerClass(IntSumReducer.class);
           job.setReducerClass(IntSumReducer.class);
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(IntWritable.class);
           for (int i = 0; i < otherArgs.length - 1; ++i) {
             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
           }
           FileOutputFormat.setOutputPath(job,
             new Path(otherArgs[otherArgs.length - 1]));
           System.exit(job.waitForCompletion(true) ? 0 : 1);
         }
       }
    4. Press Esc to exit the edit mode. Then, enter :wq to save and close the file.
  4. Compile and package the created file.
    1. Run the following command to compile the program:
      javac -classpath <HADOOP_HOME>/share/hadoop/common/hadoop-common-X.X.X.jar:<HADOOP_HOME>/share/hadoop/mapreduce/hadoop-mapreduce-client-core-X.X.X.jar:<HADOOP_HOME>/share/hadoop/common/lib/commons-cli-1.2.jar -d wordcount_classes EmrWordCount.java
      • HADOOP_HOME: the installation directory of Hadoop. In most cases, the /usr/lib/hadoop-current directory is used.

        You can run the env |grep hadoop command to obtain the installation directory.

      • X.X.X: the version of the JAR package. It must be the same as the version of Hadoop in the cluster.

        For the hadoop-common-X.X.X.jar file, you can view the version in the <HADOOP_HOME>/share/hadoop/common/ directory. For the hadoop-mapreduce-client-core-X.X.X.jar file, you can view the version in the <HADOOP_HOME>/share/hadoop/mapreduce/ directory.

    2. Run the following command to package the compiled program into a JAR file:
      jar cvf wordcount.jar -C wordcount_classes .
      Note In this example, the JAR file is wordcount.jar and is saved in the default directory /root.
  5. Create a job.
    1. Upload the wordcount.jar file obtained in Step 4 to OSS. For more information, see Upload objects.
      In this example, the file is uploaded to oss://<yourBucketName>/jars/wordcount.jar.
    2. Create a MapReduce job in the EMR console. For more information, see Configure a Hadoop MapReduce job.
      Job content:
      ossref://<yourBucketName>/jars/wordcount.jar org.apache.hadoop.examples.EmrWordCount oss://<yourBucketName>/data/WordCount/Input oss://<yourBucketName>/data/WordCount/Output

      Replace <yourBucketName> in the code with the name of the OSS bucket that you use. oss://<yourBucketName>/data/WordCount/Input indicates the input path and oss://<yourBucketName>/data/WordCount/Output indicates the output path.

    3. On the Edit Job page, click Run.
      The MapReduce job starts to run in the cluster.

Wordcount2 example

If your project is large, you can use Maven or a similar tool to manage jobs in the project. This example demonstrates how to use Maven to manage a MapReduce job.

  1. Install Maven and Java on your on-premises machine.
    In this example, Maven 3.0 and Java 1.8 are used.
  2. Run the following command to generate a project framework.
    In this example, the root directory to develop a project is D:/workspace.
    mvn archetype:generate -DgroupId=com.aliyun.emr.hadoop.examples -DartifactId=wordcountv2 -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

    After you run the command, an empty sample project is automatically generated in the D:/workspace/wordcountv2 directory, which is consistent with the specified artifactId. The project contains a file named pom.xml and a class named App. The package path of the App class is consistent with the specified groupId.

  3. Add Hadoop dependencies.
    Use an integrated development environment (IDE) to open the sample project and edit the pom.xml file. If Hadoop 2.8.5 is used, add the following content to the file:
    <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-common</artifactId>
             <version>2.8.5</version>
    </dependency>
    <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>2.8.5</version>
    </dependency>
  4. Write code.
    1. Add a new class named EMapReduceOSSUtil at the same directory level as the App class in the com.aliyun.emr.hadoop.examples package.
      package com.aliyun.emr.hadoop.examples;
       import org.apache.hadoop.conf.Configuration;
       public class EMapReduceOSSUtil {
           private static String SCHEMA = "oss://";
           private static String EPSEP = ".";
           private static String HTTP_HEADER = "http://";
           /**
            * complete OSS uri
            * convert uri like: oss://bucket/path to oss://bucket.endpoint/path
            * ossref do not need this
            *
            * @param oriUri original OSS uri
            */
           public static String buildOSSCompleteUri(String oriUri, String endpoint) {
               if (endpoint == null) {
                   System.err.println("miss endpoint");
                   return oriUri;
               }
               int index = oriUri.indexOf(SCHEMA);
               if (index == -1 || index != 0) {
                   return oriUri;
               }
               int bucketIndex = index + SCHEMA.length();
               int pathIndex = oriUri.indexOf("/", bucketIndex);
               String bucket = null;
               if (pathIndex == -1) {
                   bucket = oriUri.substring(bucketIndex);
               } else {
                   bucket = oriUri.substring(bucketIndex, pathIndex);
               }
               StringBuilder retUri = new StringBuilder();
               retUri.append(SCHEMA)
                       .append(bucket)
                       .append(EPSEP)
                       .append(stripHttp(endpoint));
               if (pathIndex > 0) {
                   retUri.append(oriUri.substring(pathIndex));
               }
               return retUri.toString();
           }
           public static String buildOSSCompleteUri(String oriUri, Configuration conf) {
               return buildOSSCompleteUri(oriUri, conf.get("fs.oss.endpoint"));
           }
           private static String stripHttp(String endpoint) {
               if (endpoint.startsWith(HTTP_HEADER)) {
                   return endpoint.substring(HTTP_HEADER.length());
               }
               return endpoint;
           }
       }
    2. Add a new class named WordCount2.java at the same directory level as the App class in the com.aliyun.emr.hadoop.examples package.
      package com.aliyun.emr.hadoop.examples;
       import java.io.BufferedReader;
       import java.io.FileReader;
       import java.io.IOException;
       import java.net.URI;
       import java.util.ArrayList;
       import java.util.HashSet;
       import java.util.List;
       import java.util.Set;
       import java.util.StringTokenizer;
       import org.apache.hadoop.conf.Configuration;
       import org.apache.hadoop.fs.Path;
       import org.apache.hadoop.io.IntWritable;
       import org.apache.hadoop.io.Text;
       import org.apache.hadoop.mapreduce.Job;
       import org.apache.hadoop.mapreduce.Mapper;
       import org.apache.hadoop.mapreduce.Reducer;
       import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
       import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
       import org.apache.hadoop.mapreduce.Counter;
       import org.apache.hadoop.util.GenericOptionsParser;
       import org.apache.hadoop.util.StringUtils;
       public class WordCount2 {
           public static class TokenizerMapper
                   extends Mapper<Object, Text, Text, IntWritable>{
               static enum CountersEnum { INPUT_WORDS }
               private final static IntWritable one = new IntWritable(1);
               private Text word = new Text();
               private boolean caseSensitive;
               private Set<String> patternsToSkip = new HashSet<String>();
               private Configuration conf;
               private BufferedReader fis;
               @Override
               public void setup(Context context) throws IOException,
                       InterruptedException {
                   conf = context.getConfiguration();
                   caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
                   if (conf.getBoolean("wordcount.skip.patterns", true)) {
                       URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
                       for (URI patternsURI : patternsURIs) {
                           Path patternsPath = new Path(patternsURI.getPath());
                           String patternsFileName = patternsPath.getName().toString();
                           parseSkipFile(patternsFileName);
                       }
                   }
               }
               private void parseSkipFile(String fileName) {
                   try {
                       fis = new BufferedReader(new FileReader(fileName));
                       String pattern = null;
                       while ((pattern = fis.readLine()) != null) {
                           patternsToSkip.add(pattern);
                       }
                   } catch (IOException ioe) {
                       System.err.println("Caught exception while parsing the cached file '"
                               + StringUtils.stringifyException(ioe));
                   }
               }
               @Override
               public void map(Object key, Text value, Context context
               ) throws IOException, InterruptedException {
                   String line = (caseSensitive) ?
                           value.toString() : value.toString().toLowerCase();
                   for (String pattern : patternsToSkip) {
                       line = line.replaceAll(pattern, "");
                   }
                   StringTokenizer itr = new StringTokenizer(line);
                   while (itr.hasMoreTokens()) {
                       word.set(itr.nextToken());
                       context.write(word, one);
                       Counter counter = context.getCounter(CountersEnum.class.getName(),
                               CountersEnum.INPUT_WORDS.toString());
                       counter.increment(1);
                   }
               }
           }
           public static class IntSumReducer
                   extends Reducer<Text,IntWritable,Text,IntWritable> {
               private IntWritable result = new IntWritable();
               public void reduce(Text key, Iterable<IntWritable> values,
                                  Context context
               ) throws IOException, InterruptedException {
                   int sum = 0;
                   for (IntWritable val : values) {
                       sum += val.get();
                   }
                   result.set(sum);
                   context.write(key, result);
               }
           }
           public static void main(String[] args) throws Exception {
               Configuration conf = new Configuration();
               conf.set("fs.oss.accessKeyId", "${accessKeyId}");
               conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
               conf.set("fs.oss.endpoint","${endpoint}");
               GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
               String[] remainingArgs = optionParser.getRemainingArgs();
               if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
                   System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
                   System.exit(2);
               }
               Job job = Job.getInstance(conf, "word count");
               job.setJarByClass(WordCount2.class);
               job.setMapperClass(TokenizerMapper.class);
               job.setCombinerClass(IntSumReducer.class);
               job.setReducerClass(IntSumReducer.class);
               job.setOutputKeyClass(Text.class);
               job.setOutputValueClass(IntWritable.class);
               List<String> otherArgs = new ArrayList<String>();
               for (int i=0; i < remainingArgs.length; ++i) {
                   if ("-skip".equals(remainingArgs[i])) {
                       job.addCacheFile(new Path(EMapReduceOSSUtil.buildOSSCompleteUri(remainingArgs[++i], conf)).toUri());
                       job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
                   } else {
                       otherArgs.add(remainingArgs[i]);
                   }
               }
               FileInputFormat.addInputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(0), conf)));
               FileOutputFormat.setOutputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(1), conf)));
               System.exit(job.waitForCompletion(true) ? 0 : 1);
           }
       }
  5. In the root directory of the project, run the following command to compile and package the files of the two new classes:
    mvn clean package -DskipTests

    A JAR file named wordcountv2-1.0-SNAPSHOT.jar is generated in the target directory of the project.

  6. Create a job.
    1. Upload the wordcountv2-1.0-SNAPSHOT.jar file obtained in Step 5 to OSS. For more information, see Upload objects.
      In this example, the file is uploaded to oss://<yourBucketName>/jars/wordcountv2-1.0-SNAPSHOT.jar.
    2. Download the following files and upload them to your OSS directory:
      Note The_Sorrows_of_Young_Werther.txt is a text file in which the number of occurrences of each word needs to be counted. patterns.txt lists the word patterns to be ignored.
    3. Create a MapReduce job in the EMR console. For more information, see Configure a Hadoop MapReduce job.
      Job content:
      ossref://<yourBucketName>/jars/wordcountv2-1.0-SNAPSHOT.jar com.aliyun.emr.hadoop.examples.WordCount2 -D wordcount.case.sensitive=true oss://<yourBucketName>/jars/The_Sorrows_of_Young_Werther.txt oss://<yourBucketName>/jars/output -skip oss://<yourBucketName>/jars/patterns.txt

      Replace <yourBucketName> in the code with the name of the OSS bucket that you use. oss://<yourBucketName>/jars/output indicates the output path.

    4. On the Edit Job page, click Run.
      The MapReduce job starts to run in the cluster.