This example shows how to chain multiple MapReduce jobs sequentially in MaxCompute. A single MapReduce job cannot invoke another job at runtime, so when your processing logic requires iterative execution — where each iteration depends on the output of the previous one — you must orchestrate multiple jobs from a client-side main method. This example uses a counter to control the iteration loop.
How it works
The program runs two types of jobs in sequence:
-
Init job (
InitMapper): Writes an initial counter value (2) to the output table. -
Decrease jobs (
DecreaseMapper): Run in a loop. Each iteration reads the previous output from a resource table, decrements the value by 1, and sets a counter. The loop exits when the counter reaches0.
Job-to-job data passing uses two mechanisms:
-
Resource table (
multijobs_res_table): Carries the numeric value between jobs. -
Counter (
multijobs.value): Signals themainmethod whether to continue iterating.
The main method is the sole orchestrator: it submits jobs in order and checks completion between each step.
Prerequisites
Before you begin, ensure that you have:
-
Completed the environment setup described in Getting started
-
Built the
mapreduce-examples.jarpackage and placed it inbin\data\resourcesunder the MaxCompute client installation path
Prepare tables and resources
-
Create the test tables:
CREATE TABLE mr_empty (key STRING, value STRING); CREATE TABLE mr_multijobs_out (value BIGINT); -
Register the resources used by the job:
add table mr_multijobs_out as multijobs_res_table -f; -- Omit -f when adding the JAR for the first time. add jar data\resources\mapreduce-examples.jar -f;mr_multijobs_outis registered asmultijobs_res_tablesoDecreaseMappercan read the previous job's output as a resource table. The JAR is registered so the MaxCompute client can locate the class at runtime.
Run MultiJobs
Run the following command on the MaxCompute client:
jar -resources mapreduce-examples.jar,multijobs_res_table -classpath data\resources\mapreduce-examples.jar \
com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;
Command parameters:
| Parameter | Value | Description |
|---|---|---|
-resources |
mapreduce-examples.jar,multijobs_res_table |
Resources available to the job: the JAR package and the resource table carrying the previous job's output |
-classpath |
data\resources\mapreduce-examples.jar |
Local JAR path the client uses to locate the main class |
| Main class | com.aliyun.odps.mapred.open.example.MultiJobs |
Entry point of the program |
| Argument | mr_multijobs_out |
Output table name passed to main |
Expected result
After the job completes, mr_multijobs_out contains one record:
+------------+
| value |
+------------+
| 0 |
+------------+
Sample code
For Project Object Model (POM) dependency configuration, see the Precautions section in the Getting started guide.
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
* MultiJobs
*
* Running multiple job
*
**/
public class MultiJobs {
public static class InitMapper extends MapperBase {
@Override
public void setup(TaskContext context) throws IOException {
Record record = context.createOutputRecord();
long v = context.getJobConf().getLong("multijobs.value", 2);
record.set(0, v);
context.write(record);
}
}
public static class DecreaseMapper extends MapperBase {
@Override
public void cleanup(TaskContext context) throws IOException {
/** Obtain the variable values that are defined in the main function from JobConf. */
long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
long v = -1;
int count = 0;
/** Read the data from the output table of the previous job. */
Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
while (iter.hasNext()) {
Record r = iter.next();
v = (Long) r.get(0);
if (expect != v) {
throw new IOException("expect: " + expect + ", but: " + v);
}
count++;
}
if (count != 1) {
throw new IOException("res_table should have 1 record, but: " + count);
}
Record record = context.createOutputRecord();
v--;
record.set(0, v);
context.write(record);
/** Set the counter. The counter value can be obtained in the main function after the job is completed. */
context.getCounter("multijobs", "value").setValue(v);
}
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: TestMultiJobs <table>");
System.exit(1);
}
String tbl = args[0];
long iterCount = 2;
System.err.println("Start to run init job.");
JobConf initJob = new JobConf();
initJob.setLong("multijobs.value", iterCount);
initJob.setMapperClass(InitMapper.class);
InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);
initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
/** Explicitly set the number of reducers to 0 for map-only jobs. */
initJob.setNumReduceTasks(0);
JobClient.runJob(initJob);
while (true) {
System.err.println("Start to run iter job, count: " + iterCount);
JobConf decJob = new JobConf();
decJob.setLong("multijobs.expect.value", iterCount);
decJob.setMapperClass(DecreaseMapper.class);
InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);
/** Explicitly set the number of reducers to 0 for map-only jobs. */
decJob.setNumReduceTasks(0);
RunningJob rJob = JobClient.runJob(decJob);
iterCount--;
/** If the specified number of iterations is reached, exit the loop. */
if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
break;
}
}
if (iterCount != 0) {
throw new IOException("Job failed.");
}
}
}
Code walk-through
InitMapper
Runs once in setup(). Reads the initial counter value (multijobs.value, default 2) from JobConf and writes it to the output table. This is a map-only job — setNumReduceTasks(0) prevents the framework from launching any reducers.
DecreaseMapper
Runs in cleanup() at the end of each iteration. It:
-
Reads the expected value (
multijobs.expect.value) fromJobConf. -
Reads the single record from
multijobs_res_tableusingcontext.readResourceTable(). ThrowsIOExceptionif the record count is not exactly 1, or if the value does not match the expected value. -
Decrements the value by 1, writes it to the output table, and sets the counter (
multijobs.value) to the new value.
main method
Controls the job sequence:
-
Submits the init job and waits for it to complete.
-
Enters a loop that submits decrease jobs one at a time, waiting for each to finish before proceeding.
-
After each decrease job, checks the
multijobs.valuecounter. If it equals0, exits the loop. -
Verifies that
iterCountis0after the loop; throwsIOExceptionif not, signaling a job failure.