All Products
Search
Document Center

MaxCompute:Pipeline examples

Last Updated:Mar 26, 2026

This example shows how to count word frequencies using a MaxCompute MapReduce Pipeline job. The pipeline chains one mapper (TokenizerMapper) and two reducers (SumReducer and IdentityReducer), demonstrating how the Pipeline API connects multiple processing stages into a single job.

Prerequisites

Before you begin, ensure that you have:

Set up test resources

1. Prepare the JAR package

Place mapreduce-examples.jar in the bin\data\resources directory of your MaxCompute client installation.

2. Create tables

Run the following SQL statements in the MaxCompute client:

CREATE TABLE wc_in (key STRING, value STRING);
CREATE TABLE wc_out(key STRING, cnt BIGINT);

3. Add the JAR package as a resource

add jar data\resources\mapreduce-examples.jar -f;
The -f flag overwrites an existing resource with the same name. Omit it the first time you add the JAR package.

4. Import test data

Use a Tunnel command to upload data.txt from the bin directory of the MaxCompute client into wc_in:

tunnel upload data.txt wc_in;

The data.txt file contains the following line:

hello,odps

Run the pipeline

Run the following command in the MaxCompute client:

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCountPipeline wc_in wc_out;

Verify the result

After the job completes, query wc_out to confirm the output:

SELECT * FROM wc_out;

The expected output is:

+------------+------------+
| key        | cnt        |
+------------+------------+
| hello      | 1          |
| odps       | 1          |
+------------+------------+

Sample code

For Project Object Model (POM) dependency configuration, see Precautions.

How the pipeline works

The pipeline chains three stages. Each stage transforms the data as follows:

wc_in (key STRING, value STRING)TokenizerMapper(word STRING, count BIGINT)SumReducer(word STRING, count BIGINT)IdentityReducerwc_out (key STRING, cnt BIGINT)

ClassRole
TokenizerMapperSplits each record into tokens and emits (word, 1) for each token
SumReducerAggregates counts for each word key
IdentityReducerWrites the final (word, count) pairs to the output table

The two-reducer design separates aggregation from output formatting. SumReducer accumulates totals across all mapper outputs. IdentityReducer then maps the intermediate schema to the wc_out table schema, keeping the two concerns independent and making each reducer easier to test and replace.

If you omit OutputKeySortColumns, PartitionColumns, and OutputGroupingColumns for a mapper during pipeline construction, the framework uses the mapper's OutputKey as the default value for all three parameters.

Java code

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.Job;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.pipeline.Pipeline;
public class WordCountPipelineTest {
    public static class TokenizerMapper extends MapperBase {
        Record word;
        Record one;
        @Override
            public void setup(TaskContext context) throws IOException {
            word = context.createMapOutputKeyRecord();
            one = context.createMapOutputValueRecord();
            one.setBigint(0, 1L);
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            for (int i = 0; i < record.getColumnCount(); i++) {
                String[] words = record.get(i).toString().split("\\s+");
                for (String w : words) {
                    word.setString(0, w);
                    context.write(word, one);
                }
            }
        }
    }
    public static class SumReducer extends ReducerBase {
        private Record value;
        @Override
            public void setup(TaskContext context) throws IOException {
            value = context.createOutputValueRecord();
        }
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            long count = 0;
            while (values.hasNext()) {
                Record val = values.next();
                count += (Long) val.get(0);
            }
            value.set(0, count);
            context.write(key, value);
        }
    }
    public static class IdentityReducer extends ReducerBase {
        private Record result;
        @Override
            public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            while (values.hasNext()) {
                result.set(0, key.get(0));
                result.set(1, values.next().get(0));
                context.write(result);
            }
        }
    }
    public static void main(String[] args) throws OdpsException {
        if (args.length != 2) {
            System.err.println("Usage: WordCountPipeline <in_table> <out_table>");
            System.exit(2);
        }
        Job job = new Job();
        /** During pipeline construction, if you do not specify OutputKeySortColumns, PartitionColumns, and OutputGroupingColumns for a mapper, the framework uses OutputKey of the mapper as the default values of these parameters.
         */
        Pipeline pipeline = Pipeline.builder()
            .addMapper(TokenizerMapper.class)
            .setOutputKeySchema(
            new Column[] { new Column("word", OdpsType.STRING) })
            .setOutputValueSchema(
            new Column[] { new Column("count", OdpsType.BIGINT) })
            .setOutputKeySortColumns(new String[] { "word" })
            .setPartitionColumns(new String[] { "word" })
            .setOutputGroupingColumns(new String[] { "word" })
            .addReducer(SumReducer.class)
            .setOutputKeySchema(
            new Column[] { new Column("word", OdpsType.STRING) })
            .setOutputValueSchema(
            new Column[] { new Column("count", OdpsType.BIGINT)})
            .addReducer(IdentityReducer.class).createPipeline();
        /** Add the pipeline to jobconf. If you want to configure a combiner, use jobconf. */
        job.setPipeline(pipeline);
        /** Configure the input and output tables. */
        job.addInput(TableInfo.builder().tableName(args[0]).build());
        job.addOutput(TableInfo.builder().tableName(args[1]).build());
        /** Submit the job and wait for it to complete. */
        job.submit();
        job.waitForCompletion();
        System.exit(job.isSuccessful() == true ? 0 : 1);
    }
}

Data flow walk-through

With input hello,odps, the Tunnel command loads the data as a single record into wc_in, storing hello in the key column and odps in the value column.

TokenizerMapper iterates over all columns of each record using record.getColumnCount(). For each column value, it calls split("\\s+") and emits one (word, 1) pair per token. Because hello and odps each contain no whitespace, each is treated as a single token. The mapper emits:

(hello, 1)
(odps, 1)

SumReducer receives all values for each word key and accumulates the count. Each word appears once, so it emits:

(hello, 1)
(odps, 1)

IdentityReducer reads the (word, count) pairs from SumReducer and writes them to the wc_out output table, mapping column positions to match the (key STRING, cnt BIGINT) schema.

Final output in wc_out:

| hello      | 1          |
| odps       | 1          |