MapReduce

Last Updated: Dec 05, 2017

  1. Select WordCount example in MaxCompute project:

  2. Right-click on ‘WordCount.java’ and click Run As -> ODPS MapReduce, as follows:

  3. After the dialog box is popped up, select ‘example_project’ and click Finish:

  4. After running is successful, the following result will be displayed:

Run Uer-defined MapReduce Program

  1. Right-click ‘src’ directory. Select New -> Mapper:

  2. After selecting ‘Mapper’ and the following dialog box is displayed. Input the name of Mapper class and click Finish:

  3. Now you can find a file ‘UserMapper.java’ is generated in the directory ‘src’ in ‘Package Explorer’. The content of this file is a template of Mapper class:

    1. package odps;
    2. import java.io.IOException;
    3. import com.aliyun.odps.data.Record;
    4. import com.aliyun.odps.mapred.MapperBase;
    5. public class UserMapper extends MapperBase {
    6. @Override
    7. public void setup(TaskContext context) throws IOException {
    8. }
    9. @Override
    10. public void map(long recordNum, Record record, TaskContext context)
    11. throws IOException {
    12. }
    13. @Override
    14. public void cleanup(TaskContext context) throws IOException {
    15. }
    16. }
  4. In the template, the configured package name defaults to ‘odps’. You can modify it according to your actual requirement. Write the template contents as follows:

    1. package odps;
    2. import java.io.IOException;
    3. import com.aliyun.odps.counter.Counter;
    4. import com.aliyun.odps.data.Record;
    5. import com.aliyun.odps.mapred.MapperBase;
    6. public class UserMapper extends MapperBase {
    7. Record word;
    8. Record one;
    9. Counter gCnt;
    10. @Override
    11. public void setup(TaskContext context) throws IOException {
    12. word = context.createMapOutputKeyRecord();
    13. one = context.createMapOutputValueRecord();
    14. one.set(new Object[] { 1L });
    15. gCnt = context.getCounter("MyCounters", "global_counts");
    16. }
    17. @Override
    18. public void map(long recordNum, Record record, TaskContext context)
    19. throws IOException {
    20. for (int i = 0; i < record.getColumnCount(); i++) {
    21. String[] words = record.get(i).toString().split("\\s+");
    22. for (String w : words) {
    23. word.set(new Object[] { w });
    24. Counter cnt = context.getCounter("MyCounters", "map_outputs");
    25. cnt.increment(1);
    26. gCnt.increment(1);
    27. context.write(word, one);
    28. }
    29. }
    30. }
    31. @Override
    32. public void cleanup(TaskContext context) throws IOException {
    33. }
    34. }
  5. In the same method, right-click ‘src’ directory and select New -> Reduce:

    Input the name of Reduce class. (In this example, use ‘UserReduce’ as the class name):

  6. In ‘Package Explorer’, a file name ‘UseReduce.java’ is generated in the directory ‘src’. This file content is a template of Reduce class. Edit the template:

    1. package odps;
    2. import java.io.IOException;
    3. import java.util.Iterator;
    4. import com.aliyun.odps.counter.Counter;
    5. import com.aliyun.odps.data.Record;
    6. import com.aliyun.odps.mapred.ReducerBase;
    7. public class UserReduce extends ReducerBase {
    8. private Record result;
    9. Counter gCnt;
    10. @Override
    11. public void setup(TaskContext context) throws IOException {
    12. result = context.createOutputRecord();
    13. gCnt = context.getCounter("MyCounters", "global_counts");
    14. }
    15. @Override
    16. public void reduce(Record key, Iterator<Record> values, TaskContext context)
    17. throws IOException {
    18. long count = 0;
    19. while (values.hasNext()) {
    20. Record val = values.next();
    21. count += (Long) val.get(0);
    22. }
    23. result.set(0, key.get(0));
    24. result.set(1, count);
    25. Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
    26. cnt.increment(1);
    27. gCnt.increment(1);
    28. context.write(result);
    29. }
    30. @Override
    31. public void cleanup(TaskContext context) throws IOException {
    32. }
    33. }
  7. Create ‘main’ function: right-click ‘src’ and select New -> MapReduce Driver. Enter Driver Name (in this example, use ‘UserDriver’ as the name), Mapper and Recduce (in this example use ‘UserMapper’ and ‘UserReduce’ as corresponding name) and click Finish;. The file ‘MyDriver.java file’ is also displayed in ‘src’ directory:

  8. Edit the contents of driver:

    1. package odps;
    2. import com.aliyun.odps.OdpsException;
    3. import com.aliyun.odps.data.TableInfo;
    4. import com.aliyun.odps.examples.mr.WordCount.SumCombiner;
    5. import com.aliyun.odps.examples.mr.WordCount.SumReducer;
    6. import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;
    7. import com.aliyun.odps.mapred.JobClient;
    8. import com.aliyun.odps.mapred.RunningJob;
    9. import com.aliyun.odps.mapred.conf.JobConf;
    10. import com.aliyun.odps.mapred.utils.InputUtils;
    11. import com.aliyun.odps.mapred.utils.OutputUtils;
    12. import com.aliyun.odps.mapred.utils.SchemaUtils;
    13. public class UserDriver {
    14. public static void main(String[] args) throws OdpsException {
    15. JobConf job = new JobConf();
    16. job.setMapperClass(TokenizerMapper.class);
    17. job.setCombinerClass(SumCombiner.class);
    18. job.setReducerClass(SumReducer.class);
    19. job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    20. job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
    21. InputUtils.addTable(
    22. TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job); InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
    23. OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
    24. RunningJob rj = JobClient.runJob(job);
    25. rj.waitForCompletion();
    26. }
    27. }
  9. Run MapReduce program. Right-click ‘UserDriver.java’ and select Run As -> ODPS MapReduce to appear the following dialog box:

  10. Select ‘example_project’ as the MaxCompute Project and click Finish to run MapReduce program on the local:

  11. If the information is output as mentioned above, it indicates that local operation runs successfully. The output result is saved in the directory ‘warehouse’. Refresh MaxCompute project:

    ‘wc_out’ is the output directory and ‘R_000000’ is result file. Through local debugging, the result is confrmed to be correct and you can package MapReduce program through Eclipse export function. After it is packaged, upload the jar package to MaxCompute. About how to execute MapReduce in distributed environment, refer to Quick Start.

  12. After the local debugging passed, user can package the codes into jar package through Eclipse Export function, provided for subsequent distributed environment. In this example, we nominate the package ‘mr-examples.jar’. Select the directory ‘src’ and click Export:

  13. Select ‘Jar File’ as an export mode:

  14. You just need to export the package in ‘src’. Specify the name of Jar File to be ‘mr-examples.jar’:

  15. Click Next to export the jar file successfully.

    If you want to simulate new Project creation in the local, you can create a subdirectory (has same level with example_project) in the directory ‘warehourse’. The directory hierarchy structure is shown as follows:

    1. <warehouse>
    2. |____example_project(Project Dirctory)
    3. |____ <__tables__>
    4. | |__table_name1(non-partition table)
    5. | | |____ data(File)
    6. | | |
    7. | | |____ <__schema__> (File)
    8. | |
    9. | |__table_name2(Partition Table)
    10. | |____ partition_name=partition_value(partition directory)
    11. | | |____ data(file)
    12. | |
    13. | |____ <__schema__> (file)
    14. |
    15. |____ <__resources__>
    16. |
    17. |___table_resource_name (table resource)
    18. | |____<__ref__>
    19. |
    20. |___ file_resource_name(file resource)

    ‘_schema_‘ Example:

    1. Non-partiton table:
    2. project=project_name
    3. table=table_name
    4. columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
    5. Partition table:
    6. project=project_name
    7. table=table_name
    8. columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
    9. partitions=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
    10. Note:
    11. At present, the following five data formats are supported: bigint,double,boolean,datetime,string, which correspond to the data types in java: -long,double,boolean,java.util.Date,java.lang.String.

    ‘data’ Example:

    1. 1,1.1,true,2015-06-04 11:22:42 896,hello world
    2. \N,\N,\N,\N,\N
    3. Note:
    4. The time format is accurate to the millisecond level and all types are represented NULL by '\N'.

    Note:

    • If mapReduce program runs in the local, the default is to search corresponding tables or resources from the directory ‘warehouse’. If the tables or resources are not existent, corresponding data will be downloaded from the server and saved in ‘warehouse’. Then run MapReduce in the local.
    • After running MapReduce is finished, please refresh the directoty ‘warehouse’ to view generated result.
Thank you! We've received your feedback.