MapReduce

Last Updated: Jun 25, 2016

1) Select WordCount example in OPDS project:

2) Right-click on ‘WordCount.java’ and click <Run As-> ODPS MapReduce>, as follows:

3) After the dialog box is popped up, select ‘example_project’ and click on <Finish>:

4) After running is successful, the following result will be displayed:

Run Uer-defined MapReduce Program

1) Right-click on ‘src’ directory. Select <New -> Mapper>:

2) After selecting ‘Mapper’ and the following dialog box is displayed. Input the name of Mapper class and click on <Finish>:

3) Now you can find a file ‘UserMapper.java’ is generated in the directory ‘src’ in ‘Package Explorer’. The content of this file is a template of Mapper class:

  1. package odps;
  2. import java.io.IOException;
  3. import com.aliyun.odps.data.Record;
  4. import com.aliyun.odps.mapred.MapperBase;
  5. public class UserMapper extends MapperBase {
  6. @Override
  7. public void setup(TaskContext context) throws IOException {
  8. }
  9. @Override
  10. public void map(long recordNum, Record record, TaskContext context)
  11. throws IOException {
  12. }
  13. @Override
  14. public void cleanup(TaskContext context) throws IOException {
  15. }
  16. }

4) In the template, the configured package name defaults to ‘odps’. You can modify it according to your actual requirement. Write the template contents as follows:

  1. package odps;
  2. import java.io.IOException;
  3. import com.aliyun.odps.counter.Counter;
  4. import com.aliyun.odps.data.Record;
  5. import com.aliyun.odps.mapred.MapperBase;
  6. public class UserMapper extends MapperBase {
  7. Record word;
  8. Record one;
  9. Counter gCnt;
  10. @Override
  11. public void setup(TaskContext context) throws IOException {
  12. word = context.createMapOutputKeyRecord();
  13. one = context.createMapOutputValueRecord();
  14. one.set(new Object[] { 1L });
  15. gCnt = context.getCounter("MyCounters", "global_counts");
  16. }
  17. @Override
  18. public void map(long recordNum, Record record, TaskContext context)
  19. throws IOException {
  20. for (int i = 0; i < record.getColumnCount(); i++) {
  21. String[] words = record.get(i).toString().split("\\s+");
  22. for (String w : words) {
  23. word.set(new Object[] { w });
  24. Counter cnt = context.getCounter("MyCounters", "map_outputs");
  25. cnt.increment(1);
  26. gCnt.increment(1);
  27. context.write(word, one);
  28. }
  29. }
  30. }
  31. @Override
  32. public void cleanup(TaskContext context) throws IOException {
  33. }
  34. }

5) In the same method, right-click on ‘src’ directory and select <New -> Reduce>:

Input the name of Reduce class. (In this example, use ‘UserReduce’ as the class name):

6) In ‘Package Explorer’, a file name ‘UseReduce.java’ is generated in the directory ‘src’. This file content is a template of Reduce class. Edit the template:

  1. package odps;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import com.aliyun.odps.counter.Counter;
  5. import com.aliyun.odps.data.Record;
  6. import com.aliyun.odps.mapred.ReducerBase;
  7. public class UserReduce extends ReducerBase {
  8. private Record result;
  9. Counter gCnt;
  10. @Override
  11. public void setup(TaskContext context) throws IOException {
  12. result = context.createOutputRecord();
  13. gCnt = context.getCounter("MyCounters", "global_counts");
  14. }
  15. @Override
  16. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  17. throws IOException {
  18. long count = 0;
  19. while (values.hasNext()) {
  20. Record val = values.next();
  21. count += (Long) val.get(0);
  22. }
  23. result.set(0, key.get(0));
  24. result.set(1, count);
  25. Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
  26. cnt.increment(1);
  27. gCnt.increment(1);
  28. context.write(result);
  29. }
  30. @Override
  31. public void cleanup(TaskContext context) throws IOException {
  32. }
  33. }

7) Create ‘main’ function: right-click on ‘src’ and select <New -> MapReduce Driver>. Fill in Driver Name (in this example, use ‘UserDriver’ as the name), Mapper and Recduce (in this example use ‘UserMapper’ and ‘UserReduce’ as corresponding name) and click on <Finish>. The file ‘MyDriver.java file’ is also displayed in ‘src’ directory:

8) Edit the contents of driver:

  1. package odps;
  2. import com.aliyun.odps.OdpsException;
  3. import com.aliyun.odps.data.TableInfo;
  4. import com.aliyun.odps.examples.mr.WordCount.SumCombiner;
  5. import com.aliyun.odps.examples.mr.WordCount.SumReducer;
  6. import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;
  7. import com.aliyun.odps.mapred.JobClient;
  8. import com.aliyun.odps.mapred.RunningJob;
  9. import com.aliyun.odps.mapred.conf.JobConf;
  10. import com.aliyun.odps.mapred.utils.InputUtils;
  11. import com.aliyun.odps.mapred.utils.OutputUtils;
  12. import com.aliyun.odps.mapred.utils.SchemaUtils;
  13. public class UserDriver {
  14. public static void main(String[] args) throws OdpsException {
  15. JobConf job = new JobConf();
  16. job.setMapperClass(TokenizerMapper.class);
  17. job.setCombinerClass(SumCombiner.class);
  18. job.setReducerClass(SumReducer.class);
  19. job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
  20. job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
  21. InputUtils.addTable(
  22. TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job);
  23. InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
  24. OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
  25. RunningJob rj = JobClient.runJob(job);
  26. rj.waitForCompletion();
  27. }
  28. }

9) Run MapReduce program. Right-click on ‘UserDriver.java’ and select <Run As -> ODPS MapReduce> to pop up the following dialog box:

10) Select ‘example_project’ as the ODPS Project and click on <Finish> to run MapReduce program on the local:

11) If the information is output as mentioned above, it indicates that local operation runs successfully. The output result is saved in the directory ‘warehouse’. Refresh ODPS project:

‘wc_out’ is the output directory and ‘R_000000’ is result file. Through local debugging, the result is confrmed to be correct and you can package MapReduce program through Eclipse export function. After it is packaged, upload the jar package to ODPS. About how to execute MapReduce in distributed environment, please refer to Quick Start.

12) After the local debugging passed, user can package the codes into jar package through Eclipse Export function, provided for subsequent distributed environment. In this example, we nominate the package ‘mr-examples.jar’. Select the directory ‘src’ and click on <Export>:

13) Select ‘Jar File’ as an export mode:

14) You just need to export the package in ‘src’. Specify the name of Jar File to be ‘mr-examples.jar’:

15) Click on <Next> to export the jar file successfully.

If you want to simulate new Project creation in the local, you can create a subdirectory (has same level with example_project) in the directory ‘warehourse’. The directory hierarchy structure is shown as follows:

  1. <warehouse>
  2. |____example_project(Project Dirctory)
  3. |____ <__tables__>
  4. | |__table_name1(non-partition table)
  5. | | |____ data(File)
  6. | | |
  7. | | |____ <__schema__> (File)
  8. | |
  9. | |__table_name2(Partition Table)
  10. | |____ partition_name=partition_value(partition directory)
  11. | | |____ data(file)
  12. | |
  13. | |____ <__schema__> (file)
  14. |
  15. |____ <__resources__>
  16. |
  17. |___table_resource_name (table resource)
  18. | |____<__ref__>
  19. |
  20. |___ file_resource_name(file resource)

schema‘ Example:

  1. Non-partiton table:
  2. project=project_name
  3. table=table_name
  4. columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  5. Partition table:
  6. project=project_name
  7. table=table_name
  8. columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  9. partitions=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  10. Note:
  11. At present, the following five data formats are supported: bigint,double,boolean,datetime,string, which correspond to the data types in java: -long,double,boolean,java.util.Date,java.lang.String.

‘data’ Example:

  1. 1,1.1,true,2015-06-04 11:22:42 896,hello world
  2. \N,\N,\N,\N,\N
  3. Note: The time format is accurate to the millisecond level and all types are represented NULL by '\N'.

Notes:

  • If mapReduce program runs in the local, the default is to search corresponding tables or resources from the directory ‘warehouse’. If the tables or resources are not existent, corresponding data will be downloaded from the server and saved in ‘warehouse’. Then run MapReduce in the local.
  • After running MapReduce is finished, please refresh the directoty ‘warehouse’ to view generated result.
Thank you! We've received your feedback.