SecondarySort sorts MapReduce output by a composite key — grouping records by a primary key while sorting values by a secondary key within each group. This example shows how to implement SecondarySort in MaxCompute MapReduce using three job settings that work together: sort columns, partition columns, and grouping columns.
Prerequisites
Before you begin, ensure that you have:
Completed the environment setup described in Getting started
How it works
The mapper reads each input record containing two integers and emits a composite key-value pair ((key, value), value). The composite key carries both integers so the framework can sort by both.
Three job settings control what happens next:
| Setting | Columns | Purpose |
|---|---|---|
| Sort columns | i1, i2 | Sort all map output by primary key first, then by secondary key within each primary key group |
| Partition columns | i1 | Route records with the same primary key to the same reducer |
| Grouping columns | i1 | Call the reducer's reduce() method once per unique primary key value |
The reducer receives one group per primary key and iterates over its values, which are already in sorted order. It writes each (key, value) pair to the output table.
Prepare tables and resources
1. Create the input and output tables.
CREATE TABLE ss_in(key BIGINT, value BIGINT);
CREATE TABLE ss_out(key BIGINT, value BIGINT);2. Add the JAR package as a MaxCompute resource.
add jar data\resources\mapreduce-examples.jar -f;Omit the -f flag when adding the JAR package for the first time.3. Upload test data to ss_in using Tunnel.
tunnel upload data.txt ss_in;The data.txt file is in the bin directory of the MaxCompute client and contains:
1,2
2,1
1,1
2,2Run SecondarySort
Run the following command on the MaxCompute client:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar com.aliyun.odps.mapred.open.example.SecondarySort ss_in ss_out;Expected output
After the job completes, query ss_out. Records are sorted by key ascending, then by value ascending within each key group:
+------------+------------+
| key | value |
+------------+------------+
| 1 | 1 |
| 1 | 2 |
| 2 | 1 |
| 2 | 2 |
+------------+------------+Data flow walkthrough
The following shows how input is transformed at each stage.
Input records (ss_in):
(1, 2)
(2, 1)
(1, 1)
(2, 2)Map output — composite key (i1, i2) paired with value i2:
key=(1,1), value=1
key=(1,2), value=2
key=(2,1), value=1
key=(2,2), value=2The framework sorts all map output by (i1, i2) and routes records by i1 to the appropriate reducer.
Reduce input — two groups, one per unique i1 value:
Group i1=1: values [1, 2]
Group i1=2: values [1, 2]Output (ss_out):
(1, 1), (1, 2), (2, 1), (2, 2)Sample code
For Project Object Model (POM) dependency configuration, see Precautions.
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.data.TableInfo;
/**
*
* This is an example ODPS Map/Reduce application. It reads the input table that
* must contain two integers per record. The output is sorted by the first and
* second number and grouped on the first number.
*
**/
public class SecondarySort {
/**
* Read two integers from each line and generate a key, value pair as ((left,
* right), right).
**/
public static class MapClass extends MapperBase {
private Record key;
private Record value;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
value = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
long left = 0;
long right = 0;
if (record.getColumnCount() > 0) {
left = (Long) record.get(0);
if (record.getColumnCount() > 1) {
right = (Long) record.get(1);
}
key.set(new Object[] { (Long) left, (Long) right });
value.set(new Object[] { (Long) right });
context.write(key, value);
}
}
}
/**
* A reducer class that just emits the sum of the input values.
**/
public static class ReduceClass extends ReducerBase {
private Record result = null;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
result.set(0, key.get(0));
while (values.hasNext()) {
Record value = values.next();
result.set(1, value.get(0));
context.write(result);
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: secondarysrot <in> <out>");
System.exit(2);
}
JobConf job = new JobConf();
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
/** Set the composite key columns that determine sort order. */
// Sort by i1 first, then by i2 within each i1 group
job.setOutputKeySortColumns(new String[] { "i1", "i2" });
// Route records with the same i1 value to the same reducer
job.setPartitionColumns(new String[] { "i1" });
// Call reduce() once per unique i1 value
job.setOutputGroupingColumns(new String[] { "i1" });
// The map output key is a pair (i1, i2); the value carries i2
job.setMapOutputKeySchema(SchemaUtils.fromString("i1:bigint,i2:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("i2x:bigint"));
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
System.exit(0);
}
}