All Products
Search
Document Center

MaxCompute:MultiJobs example

Last Updated:Mar 26, 2026

This example shows how to chain multiple MapReduce jobs sequentially in MaxCompute. A single MapReduce job cannot invoke another job at runtime, so when your processing logic requires iterative execution — where each iteration depends on the output of the previous one — you must orchestrate multiple jobs from a client-side main method. This example uses a counter to control the iteration loop.

How it works

The program runs two types of jobs in sequence:

  1. Init job (InitMapper): Writes an initial counter value (2) to the output table.

  2. Decrease jobs (DecreaseMapper): Run in a loop. Each iteration reads the previous output from a resource table, decrements the value by 1, and sets a counter. The loop exits when the counter reaches 0.

Job-to-job data passing uses two mechanisms:

  • Resource table (multijobs_res_table): Carries the numeric value between jobs.

  • Counter (multijobs.value): Signals the main method whether to continue iterating.

The main method is the sole orchestrator: it submits jobs in order and checks completion between each step.

Prerequisites

Before you begin, ensure that you have:

  • Completed the environment setup described in Getting started

  • Built the mapreduce-examples.jar package and placed it in bin\data\resources under the MaxCompute client installation path

Prepare tables and resources

  1. Create the test tables:

    CREATE TABLE mr_empty (key STRING, value STRING);
    CREATE TABLE mr_multijobs_out (value BIGINT);
  2. Register the resources used by the job:

    add table mr_multijobs_out as multijobs_res_table -f;
    
    -- Omit -f when adding the JAR for the first time.
    add jar data\resources\mapreduce-examples.jar -f;

    mr_multijobs_out is registered as multijobs_res_table so DecreaseMapper can read the previous job's output as a resource table. The JAR is registered so the MaxCompute client can locate the class at runtime.

Run MultiJobs

Run the following command on the MaxCompute client:

jar -resources mapreduce-examples.jar,multijobs_res_table -classpath data\resources\mapreduce-examples.jar \
    com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;

Command parameters:

Parameter Value Description
-resources mapreduce-examples.jar,multijobs_res_table Resources available to the job: the JAR package and the resource table carrying the previous job's output
-classpath data\resources\mapreduce-examples.jar Local JAR path the client uses to locate the main class
Main class com.aliyun.odps.mapred.open.example.MultiJobs Entry point of the program
Argument mr_multijobs_out Output table name passed to main

Expected result

After the job completes, mr_multijobs_out contains one record:

+------------+
| value      |
+------------+
| 0          |
+------------+

Sample code

For Project Object Model (POM) dependency configuration, see the Precautions section in the Getting started guide.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
     * MultiJobs
     *
     * Running multiple job
     *
     **/
public class MultiJobs {
    public static class InitMapper extends MapperBase {
        @Override
            public void setup(TaskContext context) throws IOException {
            Record record = context.createOutputRecord();
            long v = context.getJobConf().getLong("multijobs.value", 2);
            record.set(0, v);
            context.write(record);
        }
    }
    public static class DecreaseMapper extends MapperBase {
        @Override
            public void cleanup(TaskContext context) throws IOException {
            /** Obtain the variable values that are defined in the main function from JobConf. */
            long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
            long v = -1;
            int count = 0;
            /** Read the data from the output table of the previous job. */
            Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
            while (iter.hasNext()) {
                Record r = iter.next();
                v = (Long) r.get(0);
                if (expect != v) {
                    throw new IOException("expect: " + expect + ", but: " + v);
                }
                count++;
            }
            if (count != 1) {
                throw new IOException("res_table should have 1 record, but: " + count);
            }
            Record record = context.createOutputRecord();
            v--;
            record.set(0, v);
            context.write(record);
            /** Set the counter. The counter value can be obtained in the main function after the job is completed. */
            context.getCounter("multijobs", "value").setValue(v);
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("Usage: TestMultiJobs <table>");
            System.exit(1);
        }
        String tbl = args[0];
        long iterCount = 2;
        System.err.println("Start to run init job.");
        JobConf initJob = new JobConf();
        initJob.setLong("multijobs.value", iterCount);
        initJob.setMapperClass(InitMapper.class);
        InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
        OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);
        initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
        initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
        /** Explicitly set the number of reducers to 0 for map-only jobs. */
        initJob.setNumReduceTasks(0);
        JobClient.runJob(initJob);
        while (true) {
            System.err.println("Start to run iter job, count: " + iterCount);
            JobConf decJob = new JobConf();
            decJob.setLong("multijobs.expect.value", iterCount);
            decJob.setMapperClass(DecreaseMapper.class);
            InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
            OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);
            /** Explicitly set the number of reducers to 0 for map-only jobs. */
            decJob.setNumReduceTasks(0);
            RunningJob rJob = JobClient.runJob(decJob);
            iterCount--;
            /** If the specified number of iterations is reached, exit the loop. */
            if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
                break;
            }
        }
        if (iterCount != 0) {
            throw new IOException("Job failed.");
        }
    }
}

Code walk-through

InitMapper

Runs once in setup(). Reads the initial counter value (multijobs.value, default 2) from JobConf and writes it to the output table. This is a map-only job — setNumReduceTasks(0) prevents the framework from launching any reducers.

DecreaseMapper

Runs in cleanup() at the end of each iteration. It:

  1. Reads the expected value (multijobs.expect.value) from JobConf.

  2. Reads the single record from multijobs_res_table using context.readResourceTable(). Throws IOException if the record count is not exactly 1, or if the value does not match the expected value.

  3. Decrements the value by 1, writes it to the output table, and sets the counter (multijobs.value) to the new value.

main method

Controls the job sequence:

  1. Submits the init job and waits for it to complete.

  2. Enters a loop that submits decrease jobs one at a time, waiting for each to finish before proceeding.

  3. After each decrease job, checks the multijobs.value counter. If it equals 0, exits the loop.

  4. Verifies that iterCount is 0 after the loop; throws IOException if not, signaling a job failure.