The maxcompute mapreduce framework does not support join logic on its own, however, you can implement the join of the data in your own map/reduce function, of course, this requires you to do some extra work.

Suppose you need to join two tables (Key bigint, value string) and key bigint, value string), the output table is chain bigint (value1 string, value2 string ), where value1 is the value of the scanner, and value2 is the value of the scanner.


  1. Prepare the jar package for the test program, assuming the name is maid and the local storage path is data \ resources.
  2. (2) Prepare tables and resources for testing the Join operation.
    • Create tables.
      create table mr_Join_src1(key bigint, value string);
      create table mr_Join_src2(key bigint, value string);
      create table mr_Join_out(key bigint, value1 string,value2 string);
    • Add resources.
      add jar data\resources\mapreduce-examples.jar -f;
  3. (3) Run tunnel to import data.
    tunnel upload data1 mr_Join_src1;
    tunnel upload data2 mr_Join_src2;
    Import the contents of the maid data as follows:
     1, hello
     2, ODPS
    Import the contents of the maid data as follows:
    1, ODPS
    4, ODPS

Test procedure:

Join in odpscmd as follows:-
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar mr_Join_src1 mr_Join_src2 mr_Join_out;

Expected Results

After the job has completed successfully, the contents of the table maid are output, as follows:
| key | value1 | value2 |
| 1 | hello | odps | 

Sample code

    Import java. util. arraylist;
    import java.util.Iterator;
    import java.util.List;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    Import com. aliyun. ODPS. Data. record;-
    import com.aliyun.odps.mapred.JobClient;
    import com.aliyun.odps.mapred.MapperBase;
    import com.aliyun.odps.mapred.ReducerBase;
    import com.aliyun.odps.mapred.conf.JobConf;
    import com.aliyun.odps.mapred.utils.InputUtils;
    import com.aliyun.odps.mapred.utils.OutputUtils;
    import com.aliyun.odps.mapred.utils.SchemaUtils;
     * Join, mr_Join_src1/mr_Join_src2(key bigint, value string), mr_Join_out(key
     * bigint, value1 string, value2 string)
    public class Join {
      public static final Log LOG = LogFactory.getLog(Join.class);
      public static class JoinMapper extends MapperBase {
        private Record mapkey;
        private Record mapvalue;
        private long tag;
        public void setup(TaskContext context) throws IOException{
          mapkey = context.createMapOutputKeyRecord();
          mapvalue = context.createMapOutputValueRecord();
          tag = context.getInputTableInfo().getLabel().equals("left") ? 0: 1;
        public void map(long key,Record record, TaskContext context)
            Throws ioexception {
          for (int i = 1; i< record.getColumnCount();i++) {
            mapvalue.set(i -1, record.get(i));
      public static class JoinReducer extends ReducerBase {
        private Record result = null;
        public void setup(TaskContext context) throws IOException{
          result = context.createOutputRecord();
        // Reduce function all records for each input will be the same key
        public void reduce(Record key,Iterator<Record>values,TaskContext context)
            Throws ioexception {
          long k = key.getBigint(0);
          List<Object[]> leftValues = new ArrayList<Object[]>();
          // Is a key + tag combination because it is set up, this ensures that record data in the left table is in front of the input record for the reduce function.
          while(values.hasNext()) {
            Record value =;
            long tag = (Long)key.get(1);
            // The data for the left table is first cached into memory
            if (tag == 0) {
            }else {
              // The data that touches the right table is output by a join with all the data on the left table, the data for the left table is all in memory.
// This implementation is just a functional display with relatively low performance and is not recommended for practical production.
              for (Object[] leftValue :leftValues) {
                int index = 0;
                for (int i = 0;i<leftValue.length;i++) {
                for (int i = 0;i< value.getColumnCount();i++) {
      public static void main(String[] args) throws Exception {
        if (args.length ! = 3) {
          System.err.println("Usage: Join <input table1> <input table2> <out>");
        JobConf job = new JobConf();
        job.setPartitionColumns(new String[]{"key"});
        job.setOutputKeySortColumns(new String[]{"key", "tag"});
        job.setOutputGroupingColumns(new String[]{"key"});
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).label("left").build(), job);
        InputUtils.addTable(TableInfo.builder().tableName(args[1]).label("right").build(), job);
        OutputUtils.addTable(TableInfo.builder().tableName(args[2]).build(), job);
        Jobclient. runjob (job );