This example walks through running a MultipleInOut MapReduce job on MaxCompute. The job reads from two input tables and routes output to two separate output tables — one non-partitioned and one partitioned — using output labels to control which reducer writes where.
Prerequisites
Before you begin, make sure you have:
Completed the environment setup described in Getting started
The
mapreduce-examples.jarfile, stored in thebin\data\resourcesdirectory of your MaxCompute client installation
Set up test tables and resources
Create the tables
Run the following statements in your MaxCompute client to create two input tables and two output tables. mr_multiinout_out2 is partitioned; add two partitions to it before running the job.
CREATE TABLE wc_in1(key STRING, value STRING);
CREATE TABLE wc_in2(key STRING, value STRING);
CREATE TABLE mr_multiinout_out1 (key STRING, cnt BIGINT);
CREATE TABLE mr_multiinout_out2 (key STRING, cnt BIGINT) PARTITIONED BY (a string, b string);
ALTER TABLE mr_multiinout_out2 ADD PARTITION (a='1', b='1');
ALTER TABLE mr_multiinout_out2 ADD PARTITION (a='2', b='2');Add the JAR resource
-- When adding the JAR package for the first time, you can ignore the -f flag.
add jar data\resources\mapreduce-examples.jar -f;Upload test data
Use Tunnel to upload data1.txt and data2.txt from the bin directory of your MaxCompute client into the two input tables.
tunnel upload data1.txt wc_in1;
tunnel upload data2.txt wc_in2;After the upload, the tables contain the following data:
wc_in1:
hello,odpswc_in2:
hello,worldRun the job
Run the following command in your MaxCompute client:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.MultipleInOut wc_in1,wc_in2 mr_multiinout_out1,mr_multiinout_out2|a=1/b=1|out1,mr_multiinout_out2|a=2/b=2|out2;The command takes two positional arguments:
| Argument | Format | Example |
|---|---|---|
| Inputs | Comma-separated table names | wc_in1,wc_in2 |
| Outputs | Comma-separated entries, each formatted as table_name|partition_spec|label | mr_multiinout_out2|a=1/b=1|out1 |
For outputs, partition_spec and label are optional. Omitting label routes output to the default (unlabeled) destination.
Expected results
The reducer routes each key to a specific output table based on the key's total count modulo 3:
| Condition | Destination | Output table |
|---|---|---|
count % 3 == 0 | Default output | mr_multiinout_out1 |
count % 3 == 1 | Label out1 | mr_multiinout_out2, partition a=1/b=1 |
count % 3 == 2 | Label out2 | mr_multiinout_out2, partition a=2/b=2 |
The cleanup() method also writes one fixed record to each output after all keys are processed: ("default", 1L), ("out1", 1L), and ("out2", 1L).
mr_multiinout_out1 — receives the ("default", 1L) record from cleanup(). No keys from the test data satisfy count % 3 == 0, so this table contains only one row:
+------------+------------+
| key | cnt |
+------------+------------+
| default | 1 |
+------------+------------+mr_multiinout_out2 — receives keys with count % 3 == 1 in partition a=1/b=1 and keys with count % 3 == 2 in partition a=2/b=2, plus the fixed cleanup() records for each partition:
+--------+------------+---+---+
| key | cnt | a | b |
+--------+------------+---+---+
| odps | 1 | 1 | 1 |
| world | 1 | 1 | 1 |
| out1 | 1 | 1 | 1 |
| hello | 2 | 2 | 2 |
| out2 | 1 | 2 | 2 |
+--------+------------+---+---+Sample code
For Project Object Model (POM) dependency configuration, see Precautions.
The example has three main parts: the mapper, the reducer with multi-output routing, and the main method that parses input/output table arguments.
Mapper: tokenize input records
TokenizerMapper emits each field value as a key with count 1.
public static class TokenizerMapper extends MapperBase {
Record word;
Record one;
@Override
public void setup(TaskContext context) throws IOException {
word = context.createMapOutputKeyRecord();
one = context.createMapOutputValueRecord();
one.set(new Object[] { 1L });
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
for (int i = 0; i < record.getColumnCount(); i++) {
word.set(new Object[] { record.get(i).toString() });
context.write(word, one);
}
}
}Reducer: route output by label
SumReducer creates one output record per labeled destination in setup(). In reduce(), it routes each key to a destination based on count % 3. The cleanup() method writes a fixed record to each output after all keys are processed.
The key APIs for multi-output routing are:
| API | Purpose |
|---|---|
context.createOutputRecord() | Create a record for the default (unlabeled) output |
context.createOutputRecord("out1") | Create a record for the output labeled "out1" |
context.write(result) | Write to the default output |
context.write(result1, "out1") | Write to the output labeled "out1" |
public static class SumReducer extends ReducerBase {
private Record result;
private Record result1;
private Record result2;
@Override
public void setup(TaskContext context) throws IOException {
/** Create a record for each output and add labels to distinguish outputs. */
result = context.createOutputRecord(); // default output (no label)
result1 = context.createOutputRecord("out1"); // output labeled "out1"
result2 = context.createOutputRecord("out2"); // output labeled "out2"
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long count = 0;
while (values.hasNext()) {
Record val = values.next();
count += (Long) val.get(0);
}
long mod = count % 3;
if (mod == 0) {
result.set(0, key.get(0));
result.set(1, count);
/** If you do not specify a label, the default output is used. */
context.write(result);
} else if (mod == 1) {
result1.set(0, key.get(0));
result1.set(1, count);
context.write(result1, "out1");
} else {
result2.set(0, key.get(0));
result2.set(1, count);
context.write(result2, "out2");
}
}
@Override
public void cleanup(TaskContext context) throws IOException {
Record result = context.createOutputRecord();
result.set(0, "default");
result.set(1, 1L);
context.write(result);
Record result1 = context.createOutputRecord("out1");
result1.set(0, "out1");
result1.set(1, 1L);
context.write(result1, "out1");
Record result2 = context.createOutputRecord("out2");
result2.set(0, "out2");
result2.set(1, 1L);
context.write(result2, "out2");
}
}Main method: configure inputs and outputs
The main method parses input and output table strings from command-line arguments, then submits the job. Input entries use the format table_name or table_name|partition_spec. Output entries add an optional label: table_name|partition_spec|label.
/** Convert partition strings such as "ds=1/pt=2" to MAP. */
public static LinkedHashMap<String, String> convertPartSpecToMap(String partSpec) {
LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
if (partSpec != null && !partSpec.trim().isEmpty()) {
String[] parts = partSpec.split("/");
for (String part : parts) {
String[] ss = part.split("=");
if (ss.length != 2) {
throw new RuntimeException(
"ODPS-0730001: error part spec format: " + partSpec);
}
map.put(ss[0], ss[1]);
}
}
return map;
}
public static void main(String[] args) throws Exception {
String[] inputs = null;
String[] outputs = null;
if (args.length == 2) {
inputs = args[0].split(",");
outputs = args[1].split(",");
} else {
System.err.println("MultipleInOut in... out...");
System.exit(1);
}
JobConf job = new JobConf();
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(SumReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
/** Parse input table strings. */
for (String in : inputs) {
String[] ss = in.split("\\|");
if (ss.length == 1) {
InputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
} else if (ss.length == 2) {
LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
InputUtils.addTable(
TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
} else {
System.err.println("Style of input: " + in + " is not right");
System.exit(1);
}
}
/** Parse output table strings. */
for (String out : outputs) {
String[] ss = out.split("\\|");
if (ss.length == 1) {
OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
} else if (ss.length == 2) {
LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
OutputUtils.addTable(
TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
} else if (ss.length == 3) {
if (ss[1].isEmpty()) {
LinkedHashMap<String, String> map = convertPartSpecToMap(ss[2]);
OutputUtils.addTable(
TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
} else {
LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
OutputUtils.addTable(
TableInfo.builder().tableName(ss[0]).partSpec(map)
.label(ss[2]).build(), job);
}
} else {
System.err.println("Style of output: " + out + " is not right");
System.exit(1);
}
}
JobClient.runJob(job);
}Full imports for this example:
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;