All Products
Search
Document Center

MaxCompute:Sort example

Last Updated:Sep 05, 2024

This topic describes an example of using Sort in MapReduce.

Prerequisites

Complete the environment configuration for testing, see Getting started.

Preparations

  1. Prepare the JAR package of the test program. In this topic, the JAR package is named mapreduce-examples.jar and stored in the bin\data\resources directory in the local installation path of MaxCompute.

  2. Prepare test tables and resources.

    1. Create test tables.

      CREATE TABLE ss_in(key BIGINT, value BIGINT);
      CREATE TABLE ss_out(key BIGINT, value BIGINT);
    2. Add test resources.

      -- When adding the JAR package for the first time, you can ignore the -f flag.
      add jar data\resources\mapreduce-examples.jar -f;
  3. Use Tunnel to import the data.txt file from the bin directory of the MaxCompute client into the ss_in table.

    tunnel upload data.txt ss_in;

    The following data is imported to the ss_in table:

     2,1
     1,1
     3,1

Procedure

Sort data on the MaxCompute client.

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.Sort ss_in ss_out;

Expected result

The job runs normally. The following data is returned in the ss_out table:

+------------+------------+
| key        | value      |
+------------+------------+
| 1          | 1          |
| 2          | 1          |
| 3          | 1          |
+------------+------------+

Sample code

For information about Project Object Model (POM) dependencies, see Precautions.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
     * This is the trivial map/reduce program that does absolutely nothing other
     * than use the framework to fragment and sort the input values.
     *
     **/
public class Sort {
    static int printUsage() {
        System.out.println("sort <input> <output>");
        return -1;
    }
    /**
       * Implements the identity function, mapping record's first two columns to
       * outputs.
       **/
    public static class IdentityMapper extends MapperBase {
        private Record key;
        private Record value;
        @Override
            public void setup(TaskContext context) throws IOException {
            key = context.createMapOutputKeyRecord();
            value = context.createMapOutputValueRecord();
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            key.set(new Object[] { (Long) record.get(0) });
            value.set(new Object[] { (Long) record.get(1) });
            context.write(key, value);
        }
    }
    
    public static class IdentityReducer extends ReducerBase {
        private Record result = null;

        @Override
        public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }

        /**
         * Writes all keys and values directly to output.
         */
        @Override
        public void reduce(Record key, Iterator<Record> values, TaskContext context)
                throws IOException {
            result.set(0, key.get(0));

            while (values.hasNext()) {
                Record val = values.next();
                result.set(1, val.get(0));
                context.write(result);
            }
        }
    }
    
    /**
       * The main driver for sort program. Invoke this method to submit the
       * map/reduce job.
       *
       * @throws IOException
       *           When there is communication problems with the job tracker.
       **/
    public static void main(String[] args) throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.setMapperClass(IdentityMapper.class);
        jobConf.setReducerClass(IdentityReducer.class);
        /** For global sorting, the number of reducers is set to 1. All the data is transferred to the same reducer. */
        /** This method applies only to the scenarios when small amounts of data are processed. If large amounts of data need to be processed, use other methods, such as TeraSort. */
        jobConf.setNumReduceTasks(1);
        jobConf.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint"));
        jobConf.setMapOutputValueSchema(SchemaUtils.fromString("value:bigint"));
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), jobConf);
        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), jobConf);
        Date startTime = new Date();
        System.out.println("Job started: " + startTime);
        JobClient.runJob(jobConf);
        Date end_time = new Date();
        System.out.println("Job ended: " + end_time);
        System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
    }
}