This example shows how to count word frequencies using a MaxCompute MapReduce Pipeline job. The pipeline chains one mapper (TokenizerMapper) and two reducers (SumReducer and IdentityReducer), demonstrating how the Pipeline API connects multiple processing stages into a single job.
Prerequisites
Before you begin, ensure that you have:
Completed the environment setup described in Getting started
Set up test resources
1. Prepare the JAR package
Place mapreduce-examples.jar in the bin\data\resources directory of your MaxCompute client installation.
2. Create tables
Run the following SQL statements in the MaxCompute client:
CREATE TABLE wc_in (key STRING, value STRING);
CREATE TABLE wc_out(key STRING, cnt BIGINT);3. Add the JAR package as a resource
add jar data\resources\mapreduce-examples.jar -f;The -f flag overwrites an existing resource with the same name. Omit it the first time you add the JAR package.4. Import test data
Use a Tunnel command to upload data.txt from the bin directory of the MaxCompute client into wc_in:
tunnel upload data.txt wc_in;The data.txt file contains the following line:
hello,odpsRun the pipeline
Run the following command in the MaxCompute client:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCountPipeline wc_in wc_out;Verify the result
After the job completes, query wc_out to confirm the output:
SELECT * FROM wc_out;The expected output is:
+------------+------------+
| key | cnt |
+------------+------------+
| hello | 1 |
| odps | 1 |
+------------+------------+Sample code
For Project Object Model (POM) dependency configuration, see Precautions.
How the pipeline works
The pipeline chains three stages. Each stage transforms the data as follows:
wc_in (key STRING, value STRING) → TokenizerMapper → (word STRING, count BIGINT) → SumReducer → (word STRING, count BIGINT) → IdentityReducer → wc_out (key STRING, cnt BIGINT)
| Class | Role |
|---|---|
TokenizerMapper | Splits each record into tokens and emits (word, 1) for each token |
SumReducer | Aggregates counts for each word key |
IdentityReducer | Writes the final (word, count) pairs to the output table |
The two-reducer design separates aggregation from output formatting. SumReducer accumulates totals across all mapper outputs. IdentityReducer then maps the intermediate schema to the wc_out table schema, keeping the two concerns independent and making each reducer easier to test and replace.
If you omitOutputKeySortColumns,PartitionColumns, andOutputGroupingColumnsfor a mapper during pipeline construction, the framework uses the mapper'sOutputKeyas the default value for all three parameters.
Java code
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.Job;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.pipeline.Pipeline;
public class WordCountPipelineTest {
public static class TokenizerMapper extends MapperBase {
Record word;
Record one;
@Override
public void setup(TaskContext context) throws IOException {
word = context.createMapOutputKeyRecord();
one = context.createMapOutputValueRecord();
one.setBigint(0, 1L);
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
for (int i = 0; i < record.getColumnCount(); i++) {
String[] words = record.get(i).toString().split("\\s+");
for (String w : words) {
word.setString(0, w);
context.write(word, one);
}
}
}
}
public static class SumReducer extends ReducerBase {
private Record value;
@Override
public void setup(TaskContext context) throws IOException {
value = context.createOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long count = 0;
while (values.hasNext()) {
Record val = values.next();
count += (Long) val.get(0);
}
value.set(0, count);
context.write(key, value);
}
}
public static class IdentityReducer extends ReducerBase {
private Record result;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
while (values.hasNext()) {
result.set(0, key.get(0));
result.set(1, values.next().get(0));
context.write(result);
}
}
}
public static void main(String[] args) throws OdpsException {
if (args.length != 2) {
System.err.println("Usage: WordCountPipeline <in_table> <out_table>");
System.exit(2);
}
Job job = new Job();
/** During pipeline construction, if you do not specify OutputKeySortColumns, PartitionColumns, and OutputGroupingColumns for a mapper, the framework uses OutputKey of the mapper as the default values of these parameters.
*/
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.setOutputKeySortColumns(new String[] { "word" })
.setPartitionColumns(new String[] { "word" })
.setOutputGroupingColumns(new String[] { "word" })
.addReducer(SumReducer.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT)})
.addReducer(IdentityReducer.class).createPipeline();
/** Add the pipeline to jobconf. If you want to configure a combiner, use jobconf. */
job.setPipeline(pipeline);
/** Configure the input and output tables. */
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
/** Submit the job and wait for it to complete. */
job.submit();
job.waitForCompletion();
System.exit(job.isSuccessful() == true ? 0 : 1);
}
}Data flow walk-through
With input hello,odps, the Tunnel command loads the data as a single record into wc_in, storing hello in the key column and odps in the value column.
TokenizerMapper iterates over all columns of each record using record.getColumnCount(). For each column value, it calls split("\\s+") and emits one (word, 1) pair per token. Because hello and odps each contain no whitespace, each is treated as a single token. The mapper emits:
(hello, 1)
(odps, 1)SumReducer receives all values for each word key and accumulates the count. Each word appears once, so it emits:
(hello, 1)
(odps, 1)IdentityReducer reads the (word, count) pairs from SumReducer and writes them to the wc_out output table, mapping column positions to match the (key STRING, cnt BIGINT) schema.
Final output in wc_out:
| hello | 1 |
| odps | 1 |