All Products
Search
Document Center

MaxCompute:WordCount example

Last Updated:Mar 26, 2026

This example walks you through running a WordCount job in MaxCompute MapReduce — from creating input tables and uploading data to running the job and verifying the output.

How it works

The WordCount example follows the standard MapReduce data flow:

(input) <k1, v1>  ->  map  ->  <k2, v2>  ->  combine  ->  <k2, v2>  ->  reduce  ->  <k3, v3>  (output)

Three classes implement this pipeline:

  • TokenizerMapper: reads each record from the input table, emits each field value as a key with a count of 1

  • SumCombiner: aggregates counts locally on the mapper node before sending data to the reducer, reducing network traffic

  • SumReducer: sums all counts for each unique key and writes the final result to the output table

Prerequisites

Before you begin, ensure that you have:

Prepare the environment

1. Prepare the JAR package.

The JAR package for this example is mapreduce-examples.jar, located in the bin\data\resources directory of the MaxCompute client installation path.

Create the input and output tables:

CREATE TABLE wc_in (key STRING, value STRING);
CREATE TABLE wc_out (key STRING, cnt BIGINT);

Add the JAR package as a resource:

add jar data\resources\mapreduce-examples.jar -f;
Omit -f when adding the JAR package for the first time.

2. Upload input data.

Use Tunnel to upload data.txt from the bin directory of the MaxCompute client into the wc_in table:

tunnel upload data.txt wc_in;

The following data is loaded into wc_in:

hello,odps

Run WordCount

Run the WordCount job on the MaxCompute client:

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out

Verify the result

After the job completes, query the wc_out table to see the output:

SELECT * FROM wc_out;

The result should match the following:

+------------+------------+
| key        | cnt        |
+------------+------------+
| hello      | 1          |
| odps       | 1          |
+------------+------------+

Sample code

For Project Object Model (POM) dependency configuration, see the Precautions section in Getting started.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;

public class WordCount {

    // Mapper: reads each record, emits (word, 1) for every field value
    public static class TokenizerMapper extends MapperBase {
        private Record word;
        private Record one;

        @Override
        public void setup(TaskContext context) throws IOException {
            word = context.createMapOutputKeyRecord();
            one = context.createMapOutputValueRecord();
            one.set(new Object[] { 1L });
            System.out.println("TaskID:" + context.getTaskID().toString());
        }

        @Override
        public void map(long recordNum, Record record, TaskContext context)
                throws IOException {
            for (int i = 0; i < record.getColumnCount(); i++) {
                word.set(new Object[] { record.get(i).toString() });
                context.write(word, one);
            }
        }
    }

    // Combiner: aggregates map output locally before shuffling to the reducer
    public static class SumCombiner extends ReducerBase {
        private Record count;

        @Override
        public void setup(TaskContext context) throws IOException {
            count = context.createMapOutputValueRecord();
        }

        @Override
        public void reduce(Record key, Iterator<Record> values, TaskContext context)
                throws IOException {
            long c = 0;
            while (values.hasNext()) {
                Record val = values.next();
                c += (Long) val.get(0);
            }
            count.set(0, c);
            context.write(key, count);
        }
    }

    // Reducer: sums counts for each unique key and writes the final output
    public static class SumReducer extends ReducerBase {
        private Record result = null;

        @Override
        public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }

        @Override
        public void reduce(Record key, Iterator<Record> values, TaskContext context)
                throws IOException {
            long count = 0;
            while (values.hasNext()) {
                Record val = values.next();
                count += (Long) val.get(0);
            }
            result.set(0, key.get(0));
            result.set(1, count);
            context.write(result);
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: WordCount <in_table> <out_table>");
            System.exit(2);
        }

        JobConf job = new JobConf();
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(SumCombiner.class);
        job.setReducerClass(SumReducer.class);

        // Define the intermediate key-value schema between mapper and reducer
        job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));

        // Set input and output tables
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);

        JobClient.runJob(job);
    }
}

Walk-through

To illustrate how data flows through the pipeline, here is what each stage produces for the input hello,odps:

Mapper outputTokenizerMapper emits one key-value pair per field:

<hello, 1>
<odps,  1>

Combiner outputSumCombiner aggregates locally (in this small example, counts remain 1):

<hello, 1>
<odps,  1>

Reducer outputSumReducer writes the final summed counts to wc_out:

<hello, 1>
<odps,  1>