全部产品
Search
文档中心

MaxCompute:Contoh SecondarySort

更新时间:Jun 19, 2025

Topik ini menjelaskan cara menjalankan SecondarySort di MapReduce.

Prasyarat

Selesaikan konfigurasi lingkungan untuk pengujian, lihat Memulai.

Persiapan

  1. Paket JAR program uji telah disiapkan. Dalam topik ini, paket JAR bernama mapreduce-examples.jar dan disimpan di direktori bin\data\resources pada jalur instalasi lokal MaxCompute.

  2. Siapkan tabel uji dan sumber daya untuk SecondarySort.

    1. Buat tabel uji.

      CREATE TABLE ss_in(key BIGINT, value BIGINT);
      CREATE TABLE ss_out(key BIGINT, value BIGINT);
    2. Tambahkan sumber daya uji.

      -- Saat menambahkan paket JAR untuk pertama kali, Anda dapat mengabaikan flag -f.
      add jar data\resources\mapreduce-examples.jar -f;
  3. Gunakan Tunnel untuk mengimpor file data.txt dari direktori bin klien MaxCompute ke tabel ss_in.

    tunnel upload data.txt ss_in;

    Data berikut diimpor ke tabel ss_in:

    1,2
    2,1
    1,1
    2,2

Prosedur

Jalankan SecondarySort pada klien MaxCompute.

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar 
com.aliyun.odps.mapred.open.example.SecondarySort ss_in ss_out;

Hasil yang Diharapkan

Pekerjaan berjalan normal. Data berikut dikembalikan di tabel ss_out:

+------------+------------+
| key        | value      |
+------------+------------+
| 1          | 1          |
| 1          | 2          |
| 2          | 1          |
| 2          | 2          |
+------------+------------+

Kode Contoh

Untuk informasi tentang dependensi Project Object Model (POM), lihat Peringatan.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.data.TableInfo;
/**
     *
     * Ini adalah contoh aplikasi ODPS Map/Reduce. Aplikasi membaca tabel input yang
     * harus berisi dua bilangan bulat per rekaman. Output diurutkan berdasarkan angka pertama dan
     * kedua serta dikelompokkan berdasarkan angka pertama.
     *
     **/
public class SecondarySort {
    /**
       * Baca dua bilangan bulat dari setiap baris dan hasilkan pasangan kunci, nilai sebagai ((kiri,
       * kanan), kanan).
       **/
    public static class MapClass extends MapperBase {
        private Record key;
        private Record value;
        @Override
            public void setup(TaskContext context) throws IOException {
            key = context.createMapOutputKeyRecord();
            value = context.createMapOutputValueRecord();
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            long left = 0;
            long right = 0;
            if (record.getColumnCount() > 0) {
                left = (Long) record.get(0);
                if (record.getColumnCount() > 1) {
                    right = (Long) record.get(1);
                }
                key.set(new Object[] { (Long) left, (Long) right });
                value.set(new Object[] { (Long) right });
                context.write(key, value);
            }
        }
    }
    /**
       * Kelas reducer yang hanya mengeluarkan jumlah dari nilai input.
       **/
    public static class ReduceClass extends ReducerBase {
        private Record result = null;
        @Override
            public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            result.set(0, key.get(0));
            while (values.hasNext()) {
                Record value = values.next();
                result.set(1, value.get(0));
                context.write(result);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: secondarysrot <in> <out>");
            System.exit(2);
        }
        JobConf job = new JobConf();
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);
        /** Tetapkan beberapa kolom sebagai kunci. */
        //bandingkan bagian pertama dan kedua dari pasangan
        job.setOutputKeySortColumns(new String[] { "i1", "i2" });
        //partisi berdasarkan bagian pertama dari pasangan
        job.setPartitionColumns(new String[] { "i1" });
        //pengelompokan berdasarkan bagian pertama dari pasangan
        job.setOutputGroupingColumns(new String[] { "i1" });
        //output peta adalah LongPair, Long
        job.setMapOutputKeySchema(SchemaUtils.fromString("i1:bigint,i2:bigint"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("i2x:bigint"));
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
        JobClient.runJob(job);
        System.exit(0);
    }
}