All Products
Search
Document Center

MaxCompute:Join example

Last Updated:Mar 26, 2026

The MaxCompute MapReduce framework does not support Join operations natively. To join data from two tables, implement a custom map function and reduce function.

This example performs an inner join on a shared key across two input tables: mr_Join_src1(key BIGINT, value STRING) and mr_Join_src2(key BIGINT, value STRING). The output table mr_Join_out(key BIGINT, value1 STRING, value2 STRING) contains one row for each key that appears in both input tables.

Prerequisites

Before you begin, ensure that you have:

  • Completed the environment setup described in Getting started

  • The mapreduce-examples.jar package, stored in the bin\data\resources directory of the MaxCompute client installation path

How it works

The join uses a reduce-side join pattern:

  • The mapper reads from two labeled input tables (left and right) and appends a tag (0 for left, 1 for right) to each output key. The composite key schema is key:BIGINT, tag:BIGINT.

  • MaxCompute sorts and groups records by the composite key. Because tag=0 (left table) sorts before tag=1 (right table), left-table records always arrive at the reducer first.

  • The reducer buffers all left-table records in memory for a given key, then joins each right-table record against every buffered left-table record.

Important

This approach loads all left-table records for a given key into memory. Do not use this code in production — it is provided as a reference implementation only.

Set up tables and resources

  1. Create the input and output tables.

    CREATE TABLE mr_Join_src1(key BIGINT, value STRING);
    CREATE TABLE mr_Join_src2(key BIGINT, value STRING);
    CREATE TABLE mr_Join_out(key BIGINT, value1 STRING, value2 STRING);
  2. Add the JAR package. If this is the first time you are adding the JAR, you can omit the -f flag.

    add jar data\resources\mapreduce-examples.jar -f;
  3. Import data from data1.txt and data2.txt (located in the bin directory of the MaxCompute client) into the two input tables using Tunnel.

    tunnel upload data1.txt mr_Join_src1;
    tunnel upload data2.txt mr_Join_src2;

    mr_Join_src1 contains the following data:

    1,hello
    2,odps

    mr_Join_src2 contains the following data:

    1,odps
    3,hello
    4,odps

    The following table shows which keys match across both tables:

    KeyIn mr_Join_src1In mr_Join_src2Produces output
    1helloodpsYes
    2odpsNo
    3helloNo
    4odpsNo

    Only key=1 appears in both tables and produces an output row.

Run the join

Run the following command on the MaxCompute client:

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar com.aliyun.odps.mapred.open.example.Join mr_Join_src1 mr_Join_src2 mr_Join_out;

The arguments are <input table 1> <input table 2> <output table>.

Verify the result

After the job completes, mr_Join_out contains the following data. value1 is from mr_Join_src1 and value2 is from mr_Join_src2.

+------------+------------+------------+
| key        | value1     | value2     |
+------------+------------+------------+
|  1         | hello      |  odps      |
+------------+------------+------------+

Sample code

Important

This sample code has poor performance and is provided as a reference implementation only. Do not use it in production.

For Project Object Model (POM) dependencies, see Precautions.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
     * Join, mr_Join_src1/mr_Join_src2(key bigint, value string), mr_Join_out(key
     * bigint, value1 string, value2 string)
     *
     */
public class Join {
    public static final Log LOG = LogFactory.getLog(Join.class);
    public static class JoinMapper extends MapperBase {
        private Record mapkey;
        private Record mapvalue;
        private long tag;
        @Override
            public void setup(TaskContext context) throws IOException {
            mapkey = context.createMapOutputKeyRecord();
            mapvalue = context.createMapOutputValueRecord();
            tag = context.getInputTableInfo().getLabel().equals("left") ? 0 : 1;
        }
        @Override
            public void map(long key, Record record, TaskContext context)
            throws IOException {
            mapkey.set(0, record.get(0));
            mapkey.set(1, tag);
            for (int i = 1; i < record.getColumnCount(); i++) {
                mapvalue.set(i - 1, record.get(i));
            }
            context.write(mapkey, mapvalue);
        }
    }
    public static class JoinReducer extends ReducerBase {
        private Record result = null;
        @Override
            public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }
        /** Each input of the reduce function is the records that have the same key. */
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            long k = key.getBigint(0);
            List<Object[]> leftValues = new ArrayList<Object[]>();
            /** Records are sorted based on the combination of the key and tag. This ensures that records in the left table are passed to the reduce function first when the reduce function performs the Join operation. */
            while (values.hasNext()) {
                Record value = values.next();
                long tag = (Long) key.get(1);
                /** Data in the left table is first cached in memory. */
                if (tag == 0) {
                    leftValues.add(value.toArray().clone());
                } else {
                    /** Data in the right table is joined with all data in the left table. */
                    /** The sample code has poor performance and is only used as an example. We recommend that you do not use the code in your production environment. */
                    for (Object[] leftValue : leftValues) {
                        int index = 0;
                        result.set(index++, k);
                        for (int i = 0; i < leftValue.length; i++) {
                            result.set(index++, leftValue[i]);
                        }
                        for (int i = 0; i < value.getColumnCount(); i++) {
                            result.set(index++, value.get(i));
                        }
                        context.write(result);
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 3) {
            System.err.println("Usage: Join <input table1> <input table2> <out>");
            System.exit(2);
        }
        JobConf job = new JobConf();
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,tag:bigint"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
        job.setPartitionColumns(new String[]{"key"});
        job.setOutputKeySortColumns(new String[]{"key", "tag"});
        job.setOutputGroupingColumns(new String[]{"key"});
        job.setNumReduceTasks(1);
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).label("left").build(), job);
        InputUtils.addTable(TableInfo.builder().tableName(args[1]).label("right").build(), job);
        OutputUtils.addTable(TableInfo.builder().tableName(args[2]).build(), job);
        JobClient.runJob(job);
    }
}