All Products
Search
Document Center

MaxCompute:Overview

Last Updated:Mar 26, 2026

This page documents the core MapReduce classes and methods in the MaxCompute Java SDK (odps-sdk-mapred).

Add the SDK dependency

Search for odps-sdk-mapred in the Maven repository to find the latest version. Add the following dependency to your project:

<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-mapred</artifactId>
    <version>0.40.10-public</version>
</dependency>

Data types

MaxCompute MapReduce supports the following data types. The table below shows the mapping to Java types.

MaxCompute type Java type
BIGINT Long
STRING String
DOUBLE Double
BOOLEAN Boolean
DATETIME Date
DECIMAL BigDecimal

Classes overview

Class Description
MapperBase Base class for user-defined mappers. Converts input table records into key-value pairs and passes them to a reducer. Jobs that skip the reduce stage and write results directly are called MapOnly jobs.
ReducerBase Base class for user-defined reducers. Aggregates the set of values associated with each key.
TaskContext Provides the task execution context. Passed as an input parameter to the lifecycle methods of MapperBase and ReducerBase.
JobClient Submits and manages jobs. Supports both blocking (synchronous) and non-blocking (asynchronous) submission.
RunningJob Represents a running job instance. Use it to track status, wait for completion, and retrieve counters.
JobConf Holds the configuration for a MapReduce job. Define a JobConf in your main function, then pass it to JobClient to submit the job.

MapperBase

The framework calls the mapper lifecycle methods in this order: setup once at the start, map once per input record, and cleanup once at the end.

public class WordCountMapper extends MapperBase {
    @Override
    public void setup(TaskContext context) throws IOException {
        // Initialize resources (for example, load lookup tables)
    }

    @Override
    public void map(long key, Record record, TaskContext context) throws IOException {
        // Process each input record and emit key-value pairs.
        // MaxCompute uses (long key, Record record), not (KEYIN key, VALUEIN value).
        Record mapKey = context.createMapOutputKeyRecord();
        Record mapValue = context.createMapOutputValueRecord();
        // Populate mapKey and mapValue, then emit:
        context.write(mapKey, mapValue);
    }

    @Override
    public void cleanup(TaskContext context) throws IOException {
        // Release resources or emit final accumulated results
    }
}
Method Description
void setup(TaskContext context) Called once before the first map call. Use it to initialize shared state or load resources.
void map(long key, Record record, TaskContext context) Called once per input record. Emit key-value pairs with context.write(key, value) or write directly to an output table.
void cleanup(TaskContext context) Called once after the last map call. Use it to release resources or flush buffered output.

ReducerBase

The framework calls the reducer lifecycle methods in this order: setup once at the start, reduce once per unique key group, and cleanup once at the end.

public class WordCountReducer extends ReducerBase {
    @Override
    public void setup(TaskContext context) throws IOException {
        // Initialize resources
    }

    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
        // Aggregate all values for this key
        long count = 0;
        while (values.hasNext()) {
            Record val = values.next();
            count += val.getBigint(0);
        }
        Record output = context.createOutputRecord();
        output.set(0, key.getString(0));
        output.set(1, count);
        context.write(output);
    }

    @Override
    public void cleanup(TaskContext context) throws IOException {
        // Release resources
    }
}
Method Description
void setup(TaskContext context) Called once before the first reduce call. Use it to initialize shared state or load resources.
void reduce(Record key, Iterator<Record> values, TaskContext context) Called once per unique key group. All values associated with the key are passed as values.
void cleanup(TaskContext context) Called once after the last reduce call. Use it to release resources or flush buffered output.

TaskContext

TaskContext is passed to each lifecycle method and provides access to output tables, resources, and framework utilities.

Method Description
TableInfo[] getOutputTableInfo() Returns information about the output tables.
Record createOutputRecord() Creates a record for the default output table.
Record createOutputRecord(String label) Creates a record for the output table identified by label.
Record createMapOutputKeyRecord() Creates a record for the map-stage output key.
Record createMapOutputValueRecord() Creates a record for the map-stage output value.
void write(Record record) Writes a record to the default output table. Can be called multiple times during the reduce stage.
void write(Record record, String label) Writes a record to the output table identified by label. Can be called multiple times during the reduce stage.
void write(Record key, Record value) Emits a key-value pair during the map stage. Can be called multiple times.
BufferedInputStream readResourceFileAsStream(String resourceName) Reads a file resource by name.
Iterator<Record> readResourceTable(String resourceName) Reads a table resource by name.
Counter getCounter(Enum<?> name) Returns the counter with the specified name.
Counter getCounter(String group, String name) Returns the counter with the specified name in the specified group.
void progress() Sends a heartbeat to the MapReduce framework to prevent worker timeout.

Worker timeout

The default worker timeout is 10 minutes and cannot be changed. If a worker does not call progress() within 10 minutes, the framework terminates the worker and the map or reduce task fails. Call progress() periodically in long-running tasks to keep workers alive. The method sends a heartbeat only — it does not report task progress.

JobConf

JobConf holds all configuration for a MapReduce job, including mapper and reducer classes, key-value schemas, and resource declarations.

Method Description
void setMapperClass(Class<? extends Mapper> theClass) Sets the mapper class for the job.
void setReducerClass(Class<? extends Reducer> theClass) Sets the reducer class for the job.
void setCombinerClass(Class<? extends Reducer> theClass) Sets a combiner class. A combiner pre-aggregates records with the same key at the map stage, reducing data transferred to reducers.
void setMapOutputKeySchema(Column[] schema) Defines the schema for keys passed from the mapper to the reducer.
void setMapOutputValueSchema(Column[] schema) Defines the schema for values passed from the mapper to the reducer.
void setOutputKeySortColumns(String[] cols) Sets the columns used to sort keys before they are passed to reducers.
void setOutputGroupingColumns(String[] cols) Sets the columns used to group keys. Grouping columns must be a subset of sort columns.
void setPartitionColumns(String[] cols) Sets the partition key columns. Defaults to all key columns.
void setResources(String resourceNames) Declares the resources available to mappers and reducers. A mapper or reducer can only read resources declared here.
void setSplitSize(long size) Sets the input split size in MB. Default: 256 MB.
void setNumReduceTasks(int n) Sets the number of reduce tasks. Default: one-quarter of the number of map tasks.
void setMemoryForMapTask(int mem) Sets the memory per map worker in MB. Default: 2048 MB.
void setMemoryForReduceTask(int mem) Sets the memory per reduce worker in MB. Default: 2048 MB.

How the map and reduce stages distribute and group records

At the map stage, the framework computes a hash of each output record based on the partition key columns to determine which reducer receives it. Records are sorted by the sort columns before being sent.

At the reduce stage, records are grouped by the grouping columns. All records sharing the same grouping key are passed together to a single reduce() call.

The grouping columns are selected from the sort columns. The sort columns and partition key columns must exist in keys.

JobClient

Method Description
static RunningJob runJob(JobConf job) Submits a job in blocking (synchronous) mode. Blocks until the job completes and returns a RunningJob representing the finished job.
static RunningJob submitJob(JobConf job) Submits a job in non-blocking (asynchronous) mode. Returns immediately with a RunningJob you can use to poll status or wait for completion.

RunningJob

Method Description
String getInstanceID() Returns the job instance ID. Use this ID to view operational logs and manage the job.
boolean isComplete() Returns true if the job has finished.
boolean isSuccessful() Returns true if the job completed successfully.
void waitForCompletion() Waits for a job instance to end. The method is used for jobs that are submitted in synchronous mode.
JobStatus getJobStatus() Returns the current status of the job instance.
void killJob() Terminates the running job.
Counters getCounters() Returns all counter data for the job.

InputUtils

Method Description
static void addTable(TableInfo table, JobConf conf) Adds a single input table to the job. Calling this multiple times appends each table to the input queue.
static void setTables(TableInfo[] tables, JobConf conf) Sets multiple input tables at once.

OutputUtils

Method Description
static void addTable(TableInfo table, JobConf conf) Adds a single output table to the job. Calling this multiple times appends each table to the output queue.
static void setTables(TableInfo[] tables, JobConf conf) Sets multiple output tables at once.

Pipeline (extended MapReduce model)

Pipeline is the entry point for the extended MapReduce model, which lets you chain multiple mappers and reducers in a single job. Use Pipeline.builder() to construct the pipeline.

Builder methods

public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
       Column[] keySchema, Column[] valueSchema, String[] sortCols,
       SortOrder[] order, String[] partCols,
       Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
       Column[] keySchema, Column[] valueSchema, String[] sortCols,
       SortOrder[] order, String[] partCols,
       Class<? extends Partitioner> theClass, String[] groupCols)
public Builder setOutputKeySchema(Column[] keySchema)
public Builder setOutputValueSchema(Column[] valueSchema)
public Builder setOutputKeySortColumns(String[] sortCols)
public Builder setOutputKeySortOrder(SortOrder[] order)
public Builder setPartitionColumns(String[] partCols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
public Builder setOutputGroupingColumns(String[] cols)

Example

The following example chains one mapper and two reducers (TokenizerMapperSumReducerIdentityReducer):

Job job = new Job();
Pipeline pipeline = Pipeline.builder()
    .addMapper(TokenizerMapper.class)
    .setOutputKeySchema(
        new Column[] { new Column("word", OdpsType.STRING) })
    .setOutputValueSchema(
        new Column[] { new Column("count", OdpsType.BIGINT) })
    .addReducer(SumReducer.class)
    .setOutputKeySchema(
        new Column[] { new Column("count", OdpsType.BIGINT) })
    .setOutputValueSchema(
        new Column[] { new Column("word", OdpsType.STRING),
        new Column("count", OdpsType.BIGINT) })
    .addReducer(IdentityReducer.class).createPipeline();

job.setPipeline(pipeline);
job.addInput(...)
job.addOutput(...)
job.submit();
To chain a single mapper with one reducer, use JobConf instead of Pipeline. Get familiar with the standard MapReduce API before using the extended model.