Select WordCount example in MaxCompute project:



Right-click WordCount.java and choose Run As -> ODPS  MapReduce, as follows:



After the dialog box is popped up, select example_project and click Finish:



After running is completed, the following result is displayed:



Run User-defined MapReduce Program

Right-click src directory. Select New ->  > Mapper:



After selecting Mapper, the following dialog box is displayed.  Input the name of Mapper class and click Finish:



The file UserMapper.java is generated in the src directory in Package Explorer.  The content of this file is a template of Mapper class:

package odps;
import java.io.IOException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
public class UserMapper extends MapperBase {
    @Override
    public void setup(TaskContext context) throws IOException {
    
    @Override
    public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
    
    @Override
    public void cleanup(TaskContext context) throws IOException {
    

In the template, the configured package name defaults to odps. You can modify it according to your actual requirement.  Write the template content as follows:

package odps;
import java.io.IOException;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
public class UserMapper extends MapperBase {
    Record word;
    Record one;
    Counter gCnt;
    @Override
    public void setup(TaskContext context) throws IOException {
          word = context.createMapOutputKeyRecord();
          one = context.createMapOutputValueRecord();
          one.set(new Object[] { 1L });
          gCnt = context.getCounter("MyCounters", "global_counts");
    
    @Override
    public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
          for (int i = 0; i < record.getColumnCount(); i++) {
              String[] words = record.get(i).toString().split("\\s+");
              for (String w : words) {
                word.set(new Object[] { w });
                Counter cnt = context.getCounter("MyCounters", "map_outputs");
                cnt.increment(1);
                gCnt.increment(1);
                context.write(word, one);
              
            
          
    @Override
    public void cleanup(TaskContext context) throws IOException {
    

Similarly, right-click src directory and select New ->  > Reduce:



Input the name of Reduce class. (In this example, use UserReduce as the class name.)

In Package Explorer, a file name UserReduce.java is generated in the src directory.  This file content is a template of Reduce class.  Edit the template:

package odps;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.ReducerBase;
public class UserReduce extends ReducerBase {
    private Record result;
    Counter gCnt;
    @Override
    public void setup(TaskContext context) throws IOException {
          result = context.createOutputRecord();
          gCnt = context.getCounter("MyCounters", "global_counts");
    
    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
          long count = 0;
          while (values.hasNext()) {
            Record val = values.next();
            count += (Long) val.get(0);
          
          result.set(0, key.get(0));
          result.set(1, count);
          Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
          cnt.increment(1);
          gCnt.increment(1);
          context.write(result);
        
    @Override
    public void cleanup(TaskContext context) throws IOException {
    

Create main function: right-click src and select New ->  > MapReduce Driver.  Enter Driver  Name (in this example, use UserDriver as the name), Mapper and Reduce (in this example use UserMapper and UserReduce as corresponding names) and click Finish.  The file MyDriver.java is also displayed in src directory:



Edit the driver content:

package odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.examples.mr.WordCount.SumCombiner;
import com.aliyun.odps.examples.mr.WordCount.SumReducer;
import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class UserDriver {
    public static void main(String[] args) throws OdpsException {
        JobConf job = new JobConf();
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(SumCombiner.class);
        job.setReducerClass(SumReducer.class);
        job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
        InputUtils.addTable(
            TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job);
        InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
        OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
        RunningJob rj = JobClient.runJob(job);
        rj.waitForCompletion();
    

Run MapReduce program. Right-click UserDriver.java and select Run As ->  > ODPS MapReduce,    the following dialog box is displayed:



Select example_project as the MaxCompute Project and  click Finish to run MapReduce program in the local:



If the output is the same as in the preceding figure, it indicates that local operation runs successfully.  The output result is saved in the warehouse directory.   Refresh MaxCompute project:



wc_out is the output directory and R_000000 is the result file. By local debugging, the result is confirmed to be correct and you can package MapReduce program using Eclipse export function.  After it is packaged, upload the jar package to MaxCompute. For more information how to run MapReduce in distributed environment, see Quick Start.

After the local debugging is completed, you can package the codes in jar package using Eclipse Export function, provided for subsequent distributed environment. In this example, the package name is mr-examples.jar. Select the src directory and click Export:



Select Jar File as an export mode:



You must only export the package in src. The Jar  File name must be specified as mr-examples.jar:



Click Next to export the jar file.

If you want to simulate new Project creation in the local, you can create a subdirectory (has same level with example_project) in the warehouse directory. The directory hierarchy structure is shown as follows:

<warehouse>
   |____example_project(Project Dirctory)
          |____ <__tables__>
          | |__table_name1(non-partition table)
          | | |____ data(File)
          
          | | |____ <__schema__> (File)
          
          | |__table_name2(Partition Table)
          | |____ partition_name=partition_value(partition directory)
          | | |____ data(file)
          
          | |____ <__schema__> (file)
          
          |____ <__resources__>
                  
                  |___table_resource_name (table resource)
                  | |____<__ref__>
                  
                  |___ file_resource_name(file resource)

schema Example:

Non-partiton table:
project=project_name
table=table_name
columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
Partition table:
project=project_name
table=table_name
columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
partitions=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
Note:
Currently, the following five data formats are supported: bigint,double,boolean,datetime,string, which correspond to the data types in java: -long,double,boolean,java.util.Date,java.lang.String.

data Example:

1,1.1,true,2015-06-04 11:22:42 896,hello world
\N,\N,\N,\N,\N
Note: 
The time format is accurate to the millisecond level and all types are represented NULL by '\N'.
Note
  • If MapReduce program runs in the local, the default is to search corresponding tables or resources from the warehouse directory. If the tables or resources do not exist, corresponding data will be downloaded from the server and saved in warehouse. Then run MapReduce in the local.
  • After running MapReduce is finished, refresh the warehouse directory to view the generated result.