All Products
Search
Document Center

MaxCompute:MultipleInOut example

Last Updated:Mar 25, 2026

This example walks through running a MultipleInOut MapReduce job on MaxCompute. The job reads from two input tables and routes output to two separate output tables — one non-partitioned and one partitioned — using output labels to control which reducer writes where.

Prerequisites

Before you begin, make sure you have:

  • Completed the environment setup described in Getting started

  • The mapreduce-examples.jar file, stored in the bin\data\resources directory of your MaxCompute client installation

Set up test tables and resources

Create the tables

Run the following statements in your MaxCompute client to create two input tables and two output tables. mr_multiinout_out2 is partitioned; add two partitions to it before running the job.

CREATE TABLE wc_in1(key STRING, value STRING);
CREATE TABLE wc_in2(key STRING, value STRING);
CREATE TABLE mr_multiinout_out1 (key STRING, cnt BIGINT);
CREATE TABLE mr_multiinout_out2 (key STRING, cnt BIGINT)  PARTITIONED BY (a string, b string);
ALTER TABLE mr_multiinout_out2 ADD PARTITION (a='1', b='1');
ALTER TABLE mr_multiinout_out2 ADD PARTITION (a='2', b='2');

Add the JAR resource

-- When adding the JAR package for the first time, you can ignore the -f flag.
add jar data\resources\mapreduce-examples.jar -f;

Upload test data

Use Tunnel to upload data1.txt and data2.txt from the bin directory of your MaxCompute client into the two input tables.

tunnel upload data1.txt wc_in1;
tunnel upload data2.txt wc_in2;

After the upload, the tables contain the following data:

wc_in1:

hello,odps

wc_in2:

hello,world

Run the job

Run the following command in your MaxCompute client:

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.MultipleInOut wc_in1,wc_in2 mr_multiinout_out1,mr_multiinout_out2|a=1/b=1|out1,mr_multiinout_out2|a=2/b=2|out2;

The command takes two positional arguments:

ArgumentFormatExample
InputsComma-separated table nameswc_in1,wc_in2
OutputsComma-separated entries, each formatted as table_name|partition_spec|labelmr_multiinout_out2|a=1/b=1|out1

For outputs, partition_spec and label are optional. Omitting label routes output to the default (unlabeled) destination.

Expected results

The reducer routes each key to a specific output table based on the key's total count modulo 3:

ConditionDestinationOutput table
count % 3 == 0Default outputmr_multiinout_out1
count % 3 == 1Label out1mr_multiinout_out2, partition a=1/b=1
count % 3 == 2Label out2mr_multiinout_out2, partition a=2/b=2

The cleanup() method also writes one fixed record to each output after all keys are processed: ("default", 1L), ("out1", 1L), and ("out2", 1L).

mr_multiinout_out1 — receives the ("default", 1L) record from cleanup(). No keys from the test data satisfy count % 3 == 0, so this table contains only one row:

+------------+------------+
| key        | cnt        |
+------------+------------+
| default    | 1          |
+------------+------------+

mr_multiinout_out2 — receives keys with count % 3 == 1 in partition a=1/b=1 and keys with count % 3 == 2 in partition a=2/b=2, plus the fixed cleanup() records for each partition:

+--------+------------+---+---+
| key    | cnt        | a | b |
+--------+------------+---+---+
| odps   | 1          | 1 | 1 |
| world  | 1          | 1 | 1 |
| out1   | 1          | 1 | 1 |
| hello  | 2          | 2 | 2 |
| out2   | 1          | 2 | 2 |
+--------+------------+---+---+

Sample code

For Project Object Model (POM) dependency configuration, see Precautions.

The example has three main parts: the mapper, the reducer with multi-output routing, and the main method that parses input/output table arguments.

Mapper: tokenize input records

TokenizerMapper emits each field value as a key with count 1.

public static class TokenizerMapper extends MapperBase {
    Record word;
    Record one;

    @Override
    public void setup(TaskContext context) throws IOException {
        word = context.createMapOutputKeyRecord();
        one = context.createMapOutputValueRecord();
        one.set(new Object[] { 1L });
    }

    @Override
    public void map(long recordNum, Record record, TaskContext context)
        throws IOException {
        for (int i = 0; i < record.getColumnCount(); i++) {
            word.set(new Object[] { record.get(i).toString() });
            context.write(word, one);
        }
    }
}

Reducer: route output by label

SumReducer creates one output record per labeled destination in setup(). In reduce(), it routes each key to a destination based on count % 3. The cleanup() method writes a fixed record to each output after all keys are processed.

The key APIs for multi-output routing are:

APIPurpose
context.createOutputRecord()Create a record for the default (unlabeled) output
context.createOutputRecord("out1")Create a record for the output labeled "out1"
context.write(result)Write to the default output
context.write(result1, "out1")Write to the output labeled "out1"
public static class SumReducer extends ReducerBase {
    private Record result;
    private Record result1;
    private Record result2;

    @Override
    public void setup(TaskContext context) throws IOException {
        /** Create a record for each output and add labels to distinguish outputs. */
        result = context.createOutputRecord();          // default output (no label)
        result1 = context.createOutputRecord("out1");  // output labeled "out1"
        result2 = context.createOutputRecord("out2");  // output labeled "out2"
    }

    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context)
        throws IOException {
        long count = 0;
        while (values.hasNext()) {
            Record val = values.next();
            count += (Long) val.get(0);
        }
        long mod = count % 3;
        if (mod == 0) {
            result.set(0, key.get(0));
            result.set(1, count);
            /** If you do not specify a label, the default output is used. */
            context.write(result);
        } else if (mod == 1) {
            result1.set(0, key.get(0));
            result1.set(1, count);
            context.write(result1, "out1");
        } else {
            result2.set(0, key.get(0));
            result2.set(1, count);
            context.write(result2, "out2");
        }
    }

    @Override
    public void cleanup(TaskContext context) throws IOException {
        Record result = context.createOutputRecord();
        result.set(0, "default");
        result.set(1, 1L);
        context.write(result);

        Record result1 = context.createOutputRecord("out1");
        result1.set(0, "out1");
        result1.set(1, 1L);
        context.write(result1, "out1");

        Record result2 = context.createOutputRecord("out2");
        result2.set(0, "out2");
        result2.set(1, 1L);
        context.write(result2, "out2");
    }
}

Main method: configure inputs and outputs

The main method parses input and output table strings from command-line arguments, then submits the job. Input entries use the format table_name or table_name|partition_spec. Output entries add an optional label: table_name|partition_spec|label.

/** Convert partition strings such as "ds=1/pt=2" to MAP. */
public static LinkedHashMap<String, String> convertPartSpecToMap(String partSpec) {
    LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
    if (partSpec != null && !partSpec.trim().isEmpty()) {
        String[] parts = partSpec.split("/");
        for (String part : parts) {
            String[] ss = part.split("=");
            if (ss.length != 2) {
                throw new RuntimeException(
                    "ODPS-0730001: error part spec format: " + partSpec);
            }
            map.put(ss[0], ss[1]);
        }
    }
    return map;
}

public static void main(String[] args) throws Exception {
    String[] inputs = null;
    String[] outputs = null;
    if (args.length == 2) {
        inputs = args[0].split(",");
        outputs = args[1].split(",");
    } else {
        System.err.println("MultipleInOut in... out...");
        System.exit(1);
    }

    JobConf job = new JobConf();
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(SumReducer.class);
    job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));

    /** Parse input table strings. */
    for (String in : inputs) {
        String[] ss = in.split("\\|");
        if (ss.length == 1) {
            InputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
        } else if (ss.length == 2) {
            LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
            InputUtils.addTable(
                TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
        } else {
            System.err.println("Style of input: " + in + " is not right");
            System.exit(1);
        }
    }

    /** Parse output table strings. */
    for (String out : outputs) {
        String[] ss = out.split("\\|");
        if (ss.length == 1) {
            OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
        } else if (ss.length == 2) {
            LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
            OutputUtils.addTable(
                TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
        } else if (ss.length == 3) {
            if (ss[1].isEmpty()) {
                LinkedHashMap<String, String> map = convertPartSpecToMap(ss[2]);
                OutputUtils.addTable(
                    TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
            } else {
                LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
                OutputUtils.addTable(
                    TableInfo.builder().tableName(ss[0]).partSpec(map)
                        .label(ss[2]).build(), job);
            }
        } else {
            System.err.println("Style of output: " + out + " is not right");
            System.exit(1);
        }
    }

    JobClient.runJob(job);
}

Full imports for this example:

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;