All Products
Search
Document Center

MaxCompute:SecondarySort example

Last Updated:Mar 25, 2026

SecondarySort sorts MapReduce output by a composite key — grouping records by a primary key while sorting values by a secondary key within each group. This example shows how to implement SecondarySort in MaxCompute MapReduce using three job settings that work together: sort columns, partition columns, and grouping columns.

Prerequisites

Before you begin, ensure that you have:

How it works

The mapper reads each input record containing two integers and emits a composite key-value pair ((key, value), value). The composite key carries both integers so the framework can sort by both.

Three job settings control what happens next:

SettingColumnsPurpose
Sort columnsi1, i2Sort all map output by primary key first, then by secondary key within each primary key group
Partition columnsi1Route records with the same primary key to the same reducer
Grouping columnsi1Call the reducer's reduce() method once per unique primary key value

The reducer receives one group per primary key and iterates over its values, which are already in sorted order. It writes each (key, value) pair to the output table.

Prepare tables and resources

1. Create the input and output tables.

CREATE TABLE ss_in(key BIGINT, value BIGINT);
CREATE TABLE ss_out(key BIGINT, value BIGINT);

2. Add the JAR package as a MaxCompute resource.

add jar data\resources\mapreduce-examples.jar -f;
Omit the -f flag when adding the JAR package for the first time.

3. Upload test data to ss_in using Tunnel.

tunnel upload data.txt ss_in;

The data.txt file is in the bin directory of the MaxCompute client and contains:

1,2
2,1
1,1
2,2

Run SecondarySort

Run the following command on the MaxCompute client:

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar com.aliyun.odps.mapred.open.example.SecondarySort ss_in ss_out;

Expected output

After the job completes, query ss_out. Records are sorted by key ascending, then by value ascending within each key group:

+------------+------------+
| key        | value      |
+------------+------------+
| 1          | 1          |
| 1          | 2          |
| 2          | 1          |
| 2          | 2          |
+------------+------------+

Data flow walkthrough

The following shows how input is transformed at each stage.

Input records (ss_in):

(1, 2)
(2, 1)
(1, 1)
(2, 2)

Map output — composite key (i1, i2) paired with value i2:

key=(1,1), value=1
key=(1,2), value=2
key=(2,1), value=1
key=(2,2), value=2

The framework sorts all map output by (i1, i2) and routes records by i1 to the appropriate reducer.

Reduce input — two groups, one per unique i1 value:

Group i1=1: values [1, 2]
Group i1=2: values [1, 2]

Output (ss_out):

(1, 1), (1, 2), (2, 1), (2, 2)

Sample code

For Project Object Model (POM) dependency configuration, see Precautions.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.data.TableInfo;
/**
 *
 * This is an example ODPS Map/Reduce application. It reads the input table that
 * must contain two integers per record. The output is sorted by the first and
 * second number and grouped on the first number.
 *
 **/
public class SecondarySort {
    /**
       * Read two integers from each line and generate a key, value pair as ((left,
       * right), right).
       **/
    public static class MapClass extends MapperBase {
        private Record key;
        private Record value;
        @Override
            public void setup(TaskContext context) throws IOException {
            key = context.createMapOutputKeyRecord();
            value = context.createMapOutputValueRecord();
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            long left = 0;
            long right = 0;
            if (record.getColumnCount() > 0) {
                left = (Long) record.get(0);
                if (record.getColumnCount() > 1) {
                    right = (Long) record.get(1);
                }
                key.set(new Object[] { (Long) left, (Long) right });
                value.set(new Object[] { (Long) right });
                context.write(key, value);
            }
        }
    }
    /**
       * A reducer class that just emits the sum of the input values.
       **/
    public static class ReduceClass extends ReducerBase {
        private Record result = null;
        @Override
            public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            result.set(0, key.get(0));
            while (values.hasNext()) {
                Record value = values.next();
                result.set(1, value.get(0));
                context.write(result);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: secondarysrot <in> <out>");
            System.exit(2);
        }
        JobConf job = new JobConf();
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);
        /** Set the composite key columns that determine sort order. */
        // Sort by i1 first, then by i2 within each i1 group
        job.setOutputKeySortColumns(new String[] { "i1", "i2" });
        // Route records with the same i1 value to the same reducer
        job.setPartitionColumns(new String[] { "i1" });
        // Call reduce() once per unique i1 value
        job.setOutputGroupingColumns(new String[] { "i1" });
        // The map output key is a pair (i1, i2); the value carries i2
        job.setMapOutputKeySchema(SchemaUtils.fromString("i1:bigint,i2:bigint"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("i2x:bigint"));
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
        JobClient.runJob(job);
        System.exit(0);
    }
}