This topic describes common MapReduce classes and methods.

If you use Maven, you can search for odps-sdk-mapred in the Maven repository to find the latest version of the SDK for Java. You can declare the SDK in your project by using the following Maven dependency:
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-mapred</artifactId>
    <version>0.26.2-public</version>
</dependency>

Data types

The data types that MaxCompute MapReduce supports include BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, and DECIMAL. The following table describes the mapping between MaxCompute and Java data types.
MaxCompute data type Java data type
BIGINT LONG
STRING STRING
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME DATE
DECIMAL BIGDECIMAL

MapReduce classes

Class Description
MapperBase The base class that user-defined mapper classes must inherit. A mapper converts the records in the input table to key-value pairs and passes the key-value pairs to a reducer. Alternatively, a mapper can write the key-value pairs to the result table by skipping the reduce stage. The jobs that skip the reduce stage and directly return computing results are called map-only jobs.
ReducerBase The base class that user-defined reducer classes must inherit. A reducer reduces a set of values associated with a key.
TaskContext Describes the context of a task. The task context is an input parameter of multiple member methods of MapperBase and ReducerBase.
JobClient Defines a job client. A job client submits and manages jobs. A job client can submit a job in blocking or non-blocking mode. The blocking mode is a synchronous mode, whereas the non-blocking mode is an asynchronous mode.
RunningJob Defines a running job. The objects of this class are used to track the instances of running MapReduce jobs.
JobConf Describes the configuration of a MapReduce job. The JobConf object is defined in the main function. Then, a job client submits a job to MaxCompute based on the JobConf object.

MapperBase

The following table describes the methods of the MapperBase class.
Method Description
void cleanup(TaskContext context) The method that is called after the map method at the end of the map stage.
void map(long key, Record record, TaskContext context) Processes records in the input table.
void setup(TaskContext context) The method that is called before the map method at the beginning of the map stage.

ReducerBase

The following table describes the methods of the ReducerBase class.
Method Description
void cleanup( TaskContext context) The method that is called after the reduce method at the end of the reduce stage.
void reduce(Record key, Iterator<Record > values, TaskContext context) Processes records in the input table.
void setup( TaskContext context) The method that is called before the reduce method at the beginning of the reduce stage.

TaskContext

The following table describes the methods of the TaskContext class.
Method Description
TableInfo[] getOutputTableInfo() Obtains information about the output table.
Record createOutputRecord() Creates records for the default output table.
Record createOutputRecord(String label) Creates records for the output table with the specified label.
Record createMapOutputKeyRecord() Creates records for keys in the key-value pairs that are generated at the map stage.
Record createMapOutputValueRecord() Creates records for values in the key-value pairs that are generated at the map stage.
void write(Record record) Writes records to the default output table. The method can be called multiple times at the reduce stage.
void write(Record record, String label) Writes records to the output table with the specified label. The method can be called multiple times at the reduce stage.
void write(Record key, Record value) Converts records to key-value pairs. The method can be called multiple times at the map stage.
BufferedInputStream readResourceFileAsStream(String resourceName) Reads a file resource.
Iterator<Record > readResourceTable(String resourceName) Reads a table resource.
Counter getCounter(Enum<? > name) Obtains the counter with the specified name.
Counter getCounter(String group, String name) Obtains the counter with the specified name in the specified group.
void progress() Sends heartbeat information to the MapReduce framework. If your task takes an extended period of time to process data and you do not need to call the framework during this time period, you can call this method to avoid a task timeout. The default timeout period for a task is 600 seconds.
  • If a worker runs for a long period of time and the framework determines that the worker times out, the framework stops the worker. In this case, you can call the progress method of the TaskContext class to prevent a worker from being stopped by MapReduce. The progress method sends heartbeat information to the framework. The progress method is not used to report the worker progress.
  • The default timeout period for a worker is 10 minutes in MaxCompute MapReduce. You cannot change the timeout period. If a worker does not send heartbeat information by calling the progress method in 10 minutes, the framework terminates the worker and the map or reduce task fails. Therefore, we recommend that you periodically call the progress method in a map or reduce task to prevent the framework from terminating workers unexpectedly.

JobConf

The following table describes the methods of the JobConf class.
Method Description
void setResources(String resourceNames) Declares resources that are used in the current job. A mapper or reducer can read only the resources that have been declared in the TaskContext object.
void setMapOutputKeySchema(Column[] schema) Sets the attributes of keys that are passed from the mapper to the reducer.
void setMapOutputValueSchema(Column[] schema) Sets the attributes of values that are passed from the mapper to the reducer.
void setOutputKeySortColumns(String[] cols) Sets the columns for sorting the keys that are passed from the mapper to the reducer.
void setOutputGroupingColumns(String[] cols) Sets the columns for grouping the keys.
void setMapperClass(Class<? extends Mapper > theClass) Sets a mapper for a job.
void setPartitionColumns(String[] cols) Sets the partition key columns for a job. By default, the partition key columns are all columns of the keys that are generated by the mapper.
void setReducerClass(Class<? extends Reducer > theClass) Sets a reducer for a job.
void setCombinerClass(Class<? extends Reducer > theClass) Sets a combiner for a job. A combiner combines records with the same key. It is similar to a reducer but works at the map stage.
void setSplitSize(long size) Sets the split size, in MB. The default split size is 256 MB.
void setNumReduceTasks(int n) Sets the number of reduce tasks. By default, the number of reduce tasks is one-fourth of the number of map tasks.
void setMemoryForMapTask(int mem) Sets the memory available to a worker in a map task, in MB. The default memory size is 2048 MB.
void setMemoryForReduceTask(int mem) Sets the memory available to a worker in a reduce task, in MB. The default memory size is 2048 MB.
  • The grouping columns are selected from the sort columns. The sort columns and partition key columns must exist in keys.
  • At the map stage, the hash values of records from a mapper are calculated based on the specified partition key columns. The hash values help determine the reducers to which records are passed. The records are sorted based on the sort columns before the records are passed to reducers.
  • At the reduce stage, input records are grouped based on the grouping columns. Then, a group of records that share the same key are passed to the reduce method as one input.

JobClient

The following table describes the methods of the JobClient class.
Method Description
static RunningJob runJob(JobConf job) Submits a MapReduce job in blocking mode and returns a RunningJob object.
static RunningJob submitJob(JobConf job) Submits a MapReduce job in non-blocking mode and returns a RunningJob object.

RunningJob

The following table describes the methods of the RunningJob class.
Method Description
String getInstanceID() Obtains the ID of a job instance. You can use the job instance ID to view operational logs and manage jobs.
boolean isComplete() Checks whether a job is completed.
boolean isSuccessful() Checks whether a job instance is successful.
void waitForCompletion() Waits for a job instance to end. The method is used for jobs that are submitted in asynchronous mode.
JobStatus getJobStatus() Checks the running status of a job instance.
void killJob() Ends the current job.
Counters getCounters() Obtains the counter information.

InputUtils

The following table describes the methods of the InputUtils class.
Method Description
static void addTable(TableInfo table, JobConf conf) Adds an input table to a task. The method can be called multiple times. New tables are appended to the input queue.
static void setTables(TableInfo [] tables, JobConf conf) Adds multiple input tables to a task.

OutputUtils

The following table describes the methods of the OutputUtils class.
Method Description
static void addTable(TableInfo table, JobConf conf) Adds an output table to a task. The method can be called multiple times. New tables are appended to the output queue.
static void setTables(TableInfo[] tables, JobConf conf) Adds multiple output tables to a task.

Pipeline

Pipeline is the main class of the extended MapReduce model. You can call the Pipeline.builder method to build a pipeline. The following code shows the methods of the Pipeline class:
    public Builder addMapper(Class<? extends Mapper> mapper)
    public Builder addMapper(Class<? extends Mapper> mapper,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder addReducer(Class<? extends Reducer> reducer)
    public Builder addReducer(Class<? extends Reducer> reducer,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder setOutputKeySchema(Column[] keySchema)
    public Builder setOutputValueSchema(Column[] valueSchema)
    public Builder setOutputKeySortColumns(String[] sortCols)
    public Builder setOutputKeySortOrder(SortOrder[] order)
    public Builder setPartitionColumns(String[] partCols)
    public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
    public Builder setOutputGroupingColumns(String[] cols)
The following example shows how to call the Pipeline.builder method to build a pipeline:
    Job job = new Job();
    Pipeline pipeline = Pipeline.builder()
     .addMapper(TokenizerMapper.class)
     .setOutputKeySchema(
         new Column[] { new Column("word", OdpsType.STRING) })
     .setOutputValueSchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .addReducer(SumReducer.class)
     .setOutputKeySchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .setOutputValueSchema(
         new Column[] { new Column("word", OdpsType.STRING),
         new Column("count", OdpsType.BIGINT) })
     .addReducer(IdentityReducer.class).createPipeline();
    job.setPipeline(pipeline);  
    job.addInput(...)
    job.addOutput(...)
    job.submit();
As shown in the preceding example, you can create a MapReduce job in which a mapper is followed by two reducers in the main function. If you are familiar with the basic features of MapReduce, the extended MapReduce model is easy to use.
Note
  • Before you use the extended MapReduce model, we recommend that you learn how to use MapReduce.
  • You can create a MapReduce job in which a mapper is followed by only one reducer by using JobConf.