This example walks you through running a WordCount job in MaxCompute MapReduce — from creating input tables and uploading data to running the job and verifying the output.
How it works
The WordCount example follows the standard MapReduce data flow:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)Three classes implement this pipeline:
TokenizerMapper: reads each record from the input table, emits each field value as a key with a count of
1SumCombiner: aggregates counts locally on the mapper node before sending data to the reducer, reducing network traffic
SumReducer: sums all counts for each unique key and writes the final result to the output table
Prerequisites
Before you begin, ensure that you have:
Completed the environment setup described in Getting started
Prepare the environment
1. Prepare the JAR package.
The JAR package for this example is mapreduce-examples.jar, located in the bin\data\resources directory of the MaxCompute client installation path.
Create the input and output tables:
CREATE TABLE wc_in (key STRING, value STRING);
CREATE TABLE wc_out (key STRING, cnt BIGINT);Add the JAR package as a resource:
add jar data\resources\mapreduce-examples.jar -f;Omit -f when adding the JAR package for the first time.2. Upload input data.
Use Tunnel to upload data.txt from the bin directory of the MaxCompute client into the wc_in table:
tunnel upload data.txt wc_in;The following data is loaded into wc_in:
hello,odpsRun WordCount
Run the WordCount job on the MaxCompute client:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar com.aliyun.odps.mapred.open.example.WordCount wc_in wc_outVerify the result
After the job completes, query the wc_out table to see the output:
SELECT * FROM wc_out;The result should match the following:
+------------+------------+
| key | cnt |
+------------+------------+
| hello | 1 |
| odps | 1 |
+------------+------------+Sample code
For Project Object Model (POM) dependency configuration, see the Precautions section in Getting started.
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class WordCount {
// Mapper: reads each record, emits (word, 1) for every field value
public static class TokenizerMapper extends MapperBase {
private Record word;
private Record one;
@Override
public void setup(TaskContext context) throws IOException {
word = context.createMapOutputKeyRecord();
one = context.createMapOutputValueRecord();
one.set(new Object[] { 1L });
System.out.println("TaskID:" + context.getTaskID().toString());
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
for (int i = 0; i < record.getColumnCount(); i++) {
word.set(new Object[] { record.get(i).toString() });
context.write(word, one);
}
}
}
// Combiner: aggregates map output locally before shuffling to the reducer
public static class SumCombiner extends ReducerBase {
private Record count;
@Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long c = 0;
while (values.hasNext()) {
Record val = values.next();
c += (Long) val.get(0);
}
count.set(0, c);
context.write(key, count);
}
}
// Reducer: sums counts for each unique key and writes the final output
public static class SumReducer extends ReducerBase {
private Record result = null;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long count = 0;
while (values.hasNext()) {
Record val = values.next();
count += (Long) val.get(0);
}
result.set(0, key.get(0));
result.set(1, count);
context.write(result);
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCount <in_table> <out_table>");
System.exit(2);
}
JobConf job = new JobConf();
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(SumCombiner.class);
job.setReducerClass(SumReducer.class);
// Define the intermediate key-value schema between mapper and reducer
job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
// Set input and output tables
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
}
}Walk-through
To illustrate how data flows through the pipeline, here is what each stage produces for the input hello,odps:
Mapper output — TokenizerMapper emits one key-value pair per field:
<hello, 1>
<odps, 1>Combiner output — SumCombiner aggregates locally (in this small example, counts remain 1):
<hello, 1>
<odps, 1>Reducer output — SumReducer writes the final summed counts to wc_out:
<hello, 1>
<odps, 1>