In a map-only job, the mapper writes output records directly to a MaxCompute table — no reducer runs. Unlike standard MapReduce, you only need to specify output tables, not key-value metadata for the mapper output.
This example demonstrates three things:
How to configure a map-only job by setting the reducer count to 0
How to pass parameters through JobConf and read them inside the mapper
How the
setup,map, andcleanuplifecycle methods work with conditional execution
Prerequisites
Before you begin, complete the environment setup described in Getting started.
Prepare test tables and resources
Create the input and output tables.
CREATE TABLE wc_in (key STRING, value STRING); CREATE TABLE wc_out (key STRING, cnt BIGINT);Add the JAR package as a resource.
add jar data\resources\mapreduce-examples.jar -f;Omit
-fthe first time you add the JAR package. The pathdata\resources\mapreduce-examples.jaris relative to thebindirectory of your local MaxCompute client installation.Import test data into
wc_inusing Tunnel. Run the following command from thebindirectory of the MaxCompute client, wheredata.txtis located.tunnel upload data.txt wc_in;The command loads the following rows into
wc_in:hello,odps hello,odps
Run the job
Run the following command in the MaxCompute client:
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.MapOnly wc_in wc_out mapThe command arguments map as follows:
| Argument | Description |
|---|---|
-resources mapreduce-examples.jar | Declares the JAR package as a job dependency |
-classpath data\resources\mapreduce-examples.jar | Specifies the path to the JAR package |
wc_in | Input table |
wc_out | Output table |
map | Sets option.mapper.map=true in JobConf, enabling the map() method to write output |
Expected result
After the job completes, query wc_out:
+------------+------------+
| key | cnt |
+------------+------------+
| hello | 1 |
| hello | 1 |
+------------+------------+The table contains two rows instead of one because a map-only job produces one output record per input record with no aggregation. Each hello,odps input row maps to a hello | 1 output row.
Sample code
For Project Object Model (POM) dependencies, see Precautions.
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.data.TableInfo;
public class MapOnly {
public static class MapperClass extends MapperBase {
@Override
public void setup(TaskContext context) throws IOException {
boolean is = context.getJobConf().getBoolean("option.mapper.setup", false);
/** The main function executes the following logic only if option.mapper.setup is set to true in the JobConf file: */
if (is) {
Record result = context.createOutputRecord();
result.set(0, "setup");
result.set(1, 1L);
context.write(result);
}
}
@Override
public void map(long key, Record record, TaskContext context) throws IOException {
boolean is = context.getJobConf().getBoolean("option.mapper.map", false);
/** The main function executes the following logic only if option.mapper.map is set to true in the JobConf file: */
if (is) {
Record result = context.createOutputRecord();
result.set(0, record.get(0));
result.set(1, 1L);
context.write(result);
}
}
@Override
public void cleanup(TaskContext context) throws IOException {
boolean is = context.getJobConf().getBoolean("option.mapper.cleanup", false);
/** The main function executes the following logic only if option.mapper.cleanup is set to true in the JobConf file: */
if (is) {
Record result = context.createOutputRecord();
result.set(0, "cleanup");
result.set(1, 1L);
context.write(result);
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2 && args.length != 3) {
System.err.println("Usage: OnlyMapper <in_table> <out_table> [setup|map|cleanup]");
System.exit(2);
}
JobConf job = new JobConf();
job.setMapperClass(MapperClass.class);
/** For MapOnly jobs, the number of reducers must be explicitly set to 0. */
job.setNumReduceTasks(0);
/** Configure information about input and output tables. */
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
if (args.length == 3) {
String options = new String(args[2]);
/** You can specify key-value pairs in the JobConf file, and use getJobConf of the context to query the configurations in a mapper. */
if (options.contains("setup")) {
job.setBoolean("option.mapper.setup", true);
}
if (options.contains("map")) {
job.setBoolean("option.mapper.map", true);
}
if (options.contains("cleanup")) {
job.setBoolean("option.mapper.cleanup", true);
}
}
JobClient.runJob(job);
}
}How the code works
`main()` — job configuration
job.setNumReduceTasks(0) is required for map-only jobs. Without it, the framework expects a reducer and the job fails. InputUtils.addTable and OutputUtils.addTable wire the input and output tables to the job using the command-line arguments.
The optional third argument (setup, map, or cleanup) sets the corresponding boolean flag in JobConf. Each lifecycle method reads its own flag via context.getJobConf().getBoolean(...) and writes output only when the flag is true.
`setup()` — runs once before processing starts
Writes a single record with key "setup" and count 1. Runs when option.mapper.setup=true.
`map()` — runs once per input record
Reads the first field of each input record (record.get(0)), and writes it with count 1. Runs when option.mapper.map=true. In this example, the map argument enables this method, which is why the output contains two rows — one per input row.
`cleanup()` — runs once after all records are processed
Writes a single record with key "cleanup" and count 1. Runs when option.mapper.cleanup=true.