edit-icon download-icon

MapReduce development manual

Last Updated: Dec 28, 2017

Use OSS in MapReduce

To read/write OSS in MapReduce, you need to configure the following parameters.

  1. conf.set("fs.oss.accessKeyId", "${accessKeyId}");
  2. conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
  3. conf.set("fs.oss.endpoint","${endpoint}");

Parameter description:

${accessKeyId}: The AccessKeyId of your account.

${accessKeySecret}: The AccessKey Secret.

${Endpoint}: The network used for access to OSS. It is dependent on the region of your cluster. The corresponding OSS is recommended to be in the same region of the cluster.

Specific values can be found here.

Word count

The following example explains how to read texts from OSS and calculate the word count.

  1. Write a program. Using JAVA code as an example, modify the WordCount instance obtained from the official website of Hadoop as follows. Note that modifications to this instance are only to add the configuration of the AccessKey ID and the AccessKey Secret in the code in order for the job to have permission to access OSS files.

    1. package org.apache.hadoop.examples;
    2. import java.io.IOException;
    3. import java.util.StringTokenizer;
    4. import org.apache.hadoop.conf.Configuration;
    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.IntWritable;
    7. import org.apache.hadoop.io.Text;
    8. import org.apache.hadoop.mapreduce.Job;
    9. import org.apache.hadoop.mapreduce.Mapper;
    10. import org.apache.hadoop.mapreduce.Reducer;
    11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13. import org.apache.hadoop.util.GenericOptionsParser;
    14. public class EmrWordCount {
    15. public static class TokenizerMapper
    16. extends Mapper<Object, Text, Text, IntWritable>{
    17. private final static IntWritable one = new IntWritable(1);
    18. private Text word = new Text();
    19. public void map(Object key, Text value, Context context
    20. ) throws IOException, InterruptedException {
    21. StringTokenizer itr = new StringTokenizer(value.toString());
    22. while (itr.hasMoreTokens()) {
    23. word.set(itr.nextToken());
    24. context.write(word, one);
    25. }
    26. }
    27. }
    28. public static class IntSumReducer
    29. extends Reducer<Text,IntWritable,Text,IntWritable> {
    30. private IntWritable result = new IntWritable();
    31. public void reduce(Text key, Iterable<IntWritable> values,
    32. Context context
    33. ) throws IOException, InterruptedException {
    34. int sum = 0;
    35. for (IntWritable val : values) {
    36. sum += val.get();
    37. }
    38. result.set(sum);
    39. context.write(key, result);
    40. }
    41. }
    42. public static void main(String[] args) throws Exception {
    43. Configuration conf = new Configuration();
    44. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    45. if (otherArgs.length < 2) {
    46. System.err.println("Usage: wordcount <in> [<in>...] <out>");
    47. System.exit(2);
    48. }
    49. conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    50. conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    51. conf.set("fs.oss.endpoint","${endpoint}");
    52. Job job = Job.getInstance(conf, "word count");
    53. job.setJarByClass(EmrWordCount.class);
    54. job.setMapperClass(TokenizerMapper.class);
    55. job.setCombinerClass(IntSumReducer.class);
    56. job.setReducerClass(IntSumReducer.class);
    57. job.setOutputKeyClass(Text.class);
    58. job.setOutputValueClass(IntWritable.class);
    59. for (int i = 0; i < otherArgs.length - 1; ++i) {
    60. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    61. }
    62. FileOutputFormat.setOutputPath(job,
    63. new Path(otherArgs[otherArgs.length - 1]));
    64. System.exit(job.waitForCompletion(true) ? 0 : 1);
    65. }
    66. }
  2. Compile a program. First, configure the JDK and Hadoop environments, and then follow these steps:

    1. mkdir wordcount_classes
    2. javac -classpath ${HADOOP_HOME}/share/hadoop/common/hadoop-common-2.6.0.jar:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/commons-cli-1.2.jar -d wordcount_classes EmrWordCount.java
    3. jar cvf wordcount.jar -C wordcount_classes .
  3. Create a job.

    • Upload the jar file prepared in the previous step to OSS. Log on to OSS Official Website and follow the instructions for the operation. Assume the path of the jar on OSS is: oss://emr/jars/wordcount.jar, and the input and output paths are respectively oss://emr/data/WordCount/Input and oss://emr/data/WordCount/Output.

    • Create the following job in E-MapReduce Job:

      Basic configuration

  4. Create the execution plan. Use E-MapReduce to create the job plan, then add the created job to the execution plan. Select Run Now as the policy, so that the wordcount job will be run in the selected cluster.

Manage MR job with Maven project

Maven, or similar software management tools, are recommended for when your project expands and becomes too complicated to effectively manage. The procedure is as follows:

  1. Install Maven. If you have not yet installed Maven, go to Maven to install it.

  2. Generate the project framework. At the root directory of your project (assuming the root directory of your project is located at D:/workspace), execute the following commands:

    1. mvn archetype:generate -DgroupId=com.aliyun.emr.hadoop.examples -DartifactId=wordcountv2 -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

    Maven will automatically generate an empty Sample project at D:/workspace/wordcountv2 (consistent with the artifactId you specified) containing a simple pom.xml and App class (the class package path is consistent with the groupId you specified).

  3. Add Hadoop dependency. Open this project with any IDE and edit the pom.xml file. Add the following content to dependencies:

    1. <dependency>
    2. <groupId>org.apache.hadoop</groupId>
    3. <artifactId>hadoop-mapreduce-client-common</artifactId>
    4. <version>2.6.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.hadoop</groupId>
    8. <artifactId>hadoop-common</artifactId>
    9. <version>2.6.0</version>
    10. </dependency>
  4. Write the code. Add a new class of WordCount2.java under the com.aliyun.emr.hadoop.examples package and in parallel with the App class. The content is as follows:

    1. package com.aliyun.emr.hadoop.examples;
    2. import java.io.BufferedReader;
    3. import java.io.FileReader;
    4. import java.io.IOException;
    5. import java.net.URI;
    6. import java.util.ArrayList;
    7. import java.util.HashSet;
    8. import java.util.List;
    9. import java.util.Set;
    10. import java.util.StringTokenizer;
    11. import org.apache.hadoop.conf.Configuration;
    12. import org.apache.hadoop.fs.Path;
    13. import org.apache.hadoop.io.IntWritable;
    14. import org.apache.hadoop.io.Text;
    15. import org.apache.hadoop.mapreduce.Job;
    16. import org.apache.hadoop.mapreduce.Mapper;
    17. import org.apache.hadoop.mapreduce.Reducer;
    18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    20. import org.apache.hadoop.mapreduce.Counter;
    21. import org.apache.hadoop.util.GenericOptionsParser;
    22. import org.apache.hadoop.util.StringUtils;
    23. public class WordCount2 {
    24. public static class TokenizerMapper
    25. extends Mapper<Object, Text, Text, IntWritable>{
    26. static enum CountersEnum { INPUT_WORDS }
    27. private final static IntWritable one = new IntWritable(1);
    28. private Text word = new Text();
    29. private boolean caseSensitive;
    30. private Set<String> patternsToSkip = new HashSet<String>();
    31. private Configuration conf;
    32. private BufferedReader fis;
    33. @Override
    34. public void setup(Context context) throws IOException,
    35. InterruptedException {
    36. conf = context.getConfiguration();
    37. caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
    38. if (conf.getBoolean("wordcount.skip.patterns", true)) {
    39. URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
    40. for (URI patternsURI : patternsURIs) {
    41. Path patternsPath = new Path(patternsURI.getPath());
    42. String patternsFileName = patternsPath.getName().toString();
    43. parseSkipFile(patternsFileName);
    44. }
    45. }
    46. }
    47. private void parseSkipFile(String fileName) {
    48. try {
    49. fis = new BufferedReader(new FileReader(fileName));
    50. String pattern = null;
    51. while ((pattern = fis.readLine()) != null) {
    52. patternsToSkip.add(pattern);
    53. }
    54. } catch (IOException ioe) {
    55. System.err.println("Caught exception while parsing the cached file '"
    56. + StringUtils.stringifyException(ioe));
    57. }
    58. }
    59. @Override
    60. public void map(Object key, Text value, Context context
    61. ) throws IOException, InterruptedException {
    62. String line = (caseSensitive) ?
    63. value.toString() : value.toString().toLowerCase();
    64. for (String pattern : patternsToSkip) {
    65. line = line.replaceAll(pattern, "");
    66. }
    67. StringTokenizer itr = new StringTokenizer(line);
    68. while (itr.hasMoreTokens()) {
    69. word.set(itr.nextToken());
    70. context.write(word, one);
    71. Counter counter = context.getCounter(CountersEnum.class.getName(),
    72. CountersEnum.INPUT_WORDS.toString());
    73. counter.increment(1);
    74. }
    75. }
    76. }
    77. public static class IntSumReducer
    78. extends Reducer<Text,IntWritable,Text,IntWritable> {
    79. private IntWritable result = new IntWritable();
    80. public void reduce(Text key, Iterable<IntWritable> values,
    81. Context context
    82. ) throws IOException, InterruptedException {
    83. int sum = 0;
    84. for (IntWritable val : values) {
    85. sum += val.get();
    86. }
    87. result.set(sum);
    88. context.write(key, result);
    89. }
    90. }
    91. public static void main(String[] args) throws Exception {
    92. Configuration conf = new Configuration();
    93. conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    94. conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    95. conf.set("fs.oss.endpoint","${endpoint}");
    96. GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    97. String[] remainingArgs = optionParser.getRemainingArgs();
    98. if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
    99. System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
    100. System.exit(2);
    101. }
    102. Job job = Job.getInstance(conf, "word count");
    103. job.setJarByClass(WordCount2.class);
    104. job.setMapperClass(TokenizerMapper.class);
    105. job.setCombinerClass(IntSumReducer.class);
    106. job.setReducerClass(IntSumReducer.class);
    107. job.setOutputKeyClass(Text.class);
    108. job.setOutputValueClass(IntWritable.class);
    109. List<String> otherArgs = new ArrayList<String>();
    110. for (int i=0; i < remainingArgs.length; ++i) {
    111. if ("-skip".equals(remainingArgs[i])) {
    112. job.addCacheFile(new Path(EMapReduceOSSUtil.buildOSSCompleteUri(remainingArgs[++i], conf)).toUri());
    113. job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
    114. } else {
    115. otherArgs.add(remainingArgs[i]);
    116. }
    117. }
    118. FileInputFormat.addInputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(0), conf)));
    119. FileOutputFormat.setOutputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(1), conf)));
    120. System.exit(job.waitForCompletion(true) ? 0 : 1);
    121. }
    122. }

    Refer to the following sample for the EMapReduceOSSUtil class code which is in the same directory of WordCount2:

    1. package com.aliyun.emr.hadoop.examples;
    2. import org.apache.hadoop.conf.Configuration;
    3. public class EMapReduceOSSUtil {
    4. private static String SCHEMA = "oss://";
    5. private static String AKSEP = ":";
    6. private static String BKTSEP = "@";
    7. private static String EPSEP = ".";
    8. private static String HTTP_HEADER = "http://";
    9. /**
    10. * complete OSS uri
    11. * convert uri like: oss://bucket/path to oss://accessKeyId:accessKeySecret@bucket.endpoint/path
    12. * ossref do not need this
    13. *
    14. * @param oriUri original OSS uri
    15. */
    16. public static String buildOSSCompleteUri(String oriUri, String akId, String akSecret, String endpoint) {
    17. if (akId == null) {
    18. System.err.println("miss accessKeyId");
    19. return oriUri;
    20. }
    21. if (akSecret == null) {
    22. System.err.println("miss accessKeySecret");
    23. return oriUri;
    24. }
    25. if (endpoint == null) {
    26. System.err.println("miss endpoint");
    27. return oriUri;
    28. }
    29. int index = oriUri.indexOf(SCHEMA);
    30. if (index == -1 || index != 0) {
    31. return oriUri;
    32. }
    33. int bucketIndex = index + SCHEMA.length();
    34. int pathIndex = oriUri.indexOf("/", bucketIndex);
    35. String bucket = null;
    36. if (pathIndex == -1) {
    37. bucket = oriUri.substring(bucketIndex);
    38. } else {
    39. bucket = oriUri.substring(bucketIndex, pathIndex);
    40. }
    41. StringBuilder retUri = new StringBuilder();
    42. retUri.append(SCHEMA)
    43. .append(akId)
    44. .append(AKSEP)
    45. .append(akSecret)
    46. .append(BKTSEP)
    47. .append(bucket)
    48. .append(EPSEP)
    49. .append(stripHttp(endpoint));
    50. if (pathIndex > 0) {
    51. retUri.append(oriUri.substring(pathIndex));
    52. }
    53. return retUri.toString();
    54. }
    55. public static String buildOSSCompleteUri(String oriUri, Configuration conf) {
    56. return buildOSSCompleteUri(oriUri, conf.get("fs.oss.accessKeyId"), conf.get("fs.oss.accessKeySecret"), conf.get("fs.oss.endpoint"));
    57. }
    58. private static String stripHttp(String endpoint) {
    59. if (endpoint.startsWith(HTTP_HEADER)) {
    60. return endpoint.substring(HTTP_HEADER.length());
    61. }
    62. return endpoint;
    63. }
    64. }
  5. Compile and package it for uploading. In the project directory, run the following commands:

    1. mvn clean package -DskipTests

    wordcountv2-1.0-SNAPSHOT.jar is now in the target directory of your project directory, and it is the job jar package. Upload the jar package to OSS.

  6. Create a job. Create a new job in E-MapReduce using the following parameter configurations:

    1. jar ossref://yourBucket/yourPath/wordcountv2-1.0-SNAPSHOT.jar com.aliyun.emr.hadoop.examples.WordCount2 -Dwordcount.case.sensitive=true oss://yourBucket/yourPath/The_Sorrows_of_Young_Werther.txt oss://yourBucket/yourPath/output -skip oss://yourBucket/yourPath/patterns.txt

    Here, “yourBucket” refers to one of your OSS bucket. “yourPath” refers to a path of the bucket and you need to fill it in according to the actual situation. You need to download the files oss://yourBucket/yourPath/The_Sorrows_of_Young_Werther.txt and oss://yourBucket/yourPath/patterns.txt which are for processing related resources, and store them on your OSS. You can download resources needed by the jobs from the address below and store the resources at corresponding directories of your OSS.

    Download resources:

    The_Sorrows_of_Young_Werther.txt

    patterns.txt

  7. Create an execution plan and run it. Create an execution plan in E-MapReduce, associate it with the job, and run the plan.

Thank you! We've received your feedback.