全部产品
Search
文档中心

MaxCompute:Ikhtisar

更新时间:Jun 19, 2025

Bagian ini menjelaskan kelas dan metode umum dalam MapReduce.

Jika Anda menggunakan Maven, Anda dapat mencari odps-sdk-mapred di repositori Maven untuk menemukan versi terbaru dari SDK untuk Java. Anda dapat mendeklarasikan SDK dalam proyek Anda dengan menggunakan dependensi Maven berikut:

<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-mapred</artifactId>
    <version>0.40.10-public</version>
</dependency>

Tipe Data

Tipe data yang didukung oleh MaxCompute MapReduce meliputi BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, dan DECIMAL. Tabel berikut menggambarkan pemetaan antara tipe data MaxCompute dan Java.

Tipe data MaxCompute

Tipe data Java

BIGINT

LONG

STRING

STRING

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

DATE

DECIMAL

BIGDECIMAL

Kelas MapReduce

Kelas

Deskripsi

MapperBase

Kelas dasar yang harus diwarisi oleh kelas mapper yang ditentukan pengguna. Mapper mengonversi rekaman dalam tabel input menjadi pasangan kunci-nilai dan meneruskan pasangan kunci-nilai tersebut ke reducer. Sebagai alternatif, mapper dapat menulis pasangan kunci-nilai ke tabel hasil dengan melewati tahap reduce. Pekerjaan yang melewati tahap reduce dan langsung mengembalikan hasil komputasi disebut pekerjaan MapOnly.

ReducerBase

Kelas dasar yang harus diwarisi oleh kelas reducer yang ditentukan pengguna. Reducer mereduksi satu set nilai yang terkait dengan suatu kunci.

TaskContext

Menggambarkan konteks suatu tugas. Konteks tugas adalah parameter masukan dari beberapa metode anggota MapperBase dan ReducerBase.

JobClient

Mendefinisikan klien pekerjaan. Klien pekerjaan mengirimkan dan mengelola pekerjaan. Klien pekerjaan dapat mengirimkan pekerjaan dalam mode blocking atau non-blocking. Mode blocking adalah mode sinkron, sedangkan mode non-blocking adalah mode asinkron.

RunningJob

Mendefinisikan pekerjaan yang sedang berjalan. Objek dari kelas ini digunakan untuk melacak instans pekerjaan MapReduce yang sedang berjalan.

JobConf

Menggambarkan konfigurasi pekerjaan MapReduce. Objek JobConf didefinisikan dalam fungsi utama. Kemudian, klien pekerjaan mengirimkan pekerjaan ke MaxCompute berdasarkan objek JobConf.

MapperBase

Tabel berikut menggambarkan metode dari kelas MapperBase.

Metode

Deskripsi

void cleanup(TaskContext context)

Metode yang dipanggil setelah metode map pada akhir tahap map.

void map(long key, Record record, TaskContext context)

Memproses rekaman dalam tabel input.

void setup(TaskContext context)

Metode yang dipanggil sebelum metode map pada awal tahap map.

ReducerBase

Tabel berikut menggambarkan metode dari kelas ReducerBase.

Metode

Deskripsi

void cleanup( TaskContext context)

Metode yang dipanggil setelah metode reduce pada akhir tahap reduce.

void reduce(Record key, Iterator<Record > values, TaskContext context)

Memproses rekaman dalam tabel input.

void setup( TaskContext context)

Metode yang dipanggil sebelum metode reduce pada awal tahap reduce.

TaskContext

Tabel berikut menggambarkan metode dari kelas TaskContext.

Metode

Deskripsi

TableInfo[] getOutputTableInfo()

Mendapatkan informasi tentang tabel output.

Record createOutputRecord()

Membuat rekaman untuk tabel output default.

Record createOutputRecord(String label)

Membuat rekaman untuk tabel output dengan label tertentu.

Record createMapOutputKeyRecord()

Membuat rekaman untuk kunci dalam pasangan kunci-nilai yang dihasilkan pada tahap map.

Record createMapOutputValueRecord()

Membuat rekaman untuk nilai dalam pasangan kunci-nilai yang dihasilkan pada tahap map.

void write(Record record)

Menulis rekaman ke tabel output default. Metode ini dapat dipanggil beberapa kali pada tahap reduce.

void write(Record record, String label)

Menulis rekaman ke tabel output dengan label tertentu. Metode ini dapat dipanggil beberapa kali pada tahap reduce.

void write(Record key, Record value)

Mengonversi rekaman menjadi pasangan kunci-nilai. Metode ini dapat dipanggil beberapa kali pada tahap map.

BufferedInputStream readResourceFileAsStream(String resourceName)

Membaca sumber daya file.

Iterator<Record > readResourceTable(String resourceName)

Membaca sumber daya tabel.

Counter getCounter(Enum<? > name)

Mendapatkan counter dengan nama tertentu.

Counter getCounter(String group, String name)

Mendapatkan counter dengan nama tertentu dalam grup tertentu.

void progress()

Mengirimkan informasi denyut jantung ke kerangka kerja MapReduce. Jika tugas Anda membutuhkan waktu lama untuk memproses data dan Anda tidak perlu memanggil kerangka kerja selama periode waktu ini, Anda dapat memanggil metode ini untuk menghindari timeout tugas. Periode timeout default untuk tugas adalah 600 detik.

  • Jika worker berjalan dalam waktu lama dan kerangka kerja menentukan bahwa worker timeout, kerangka kerja akan menghentikan worker. Dalam hal ini, Anda dapat memanggil metode progress dari kelas TaskContext untuk mencegah worker dihentikan oleh MapReduce. Metode progress mengirimkan informasi denyut jantung ke kerangka kerja. Metode progress tidak digunakan untuk melaporkan kemajuan worker.

  • Periode timeout default untuk worker adalah 10 menit dalam MaxCompute MapReduce. Anda tidak dapat mengubah periode timeout. Jika worker tidak mengirimkan informasi denyut jantung dengan memanggil metode progress dalam 10 menit, kerangka kerja akan menghentikan worker dan tugas map atau reduce gagal. Oleh karena itu, disarankan untuk secara berkala memanggil metode progress dalam tugas map atau reduce untuk mencegah kerangka kerja menghentikan worker secara tak terduga.

JobConf

Tabel berikut menggambarkan metode dari kelas JobConf.

Metode

Deskripsi

void setResources(String resourceNames)

Mendeklarasikan sumber daya yang digunakan dalam pekerjaan saat ini. Mapper atau reducer hanya dapat membaca sumber daya yang telah dideklarasikan dalam objek TaskContext.

void setMapOutputKeySchema(Column[] schema)

Menetapkan atribut kunci yang dilewatkan dari mapper ke reducer.

void setMapOutputValueSchema(Column[] schema)

Menetapkan atribut nilai yang dilewatkan dari mapper ke reducer.

void setOutputKeySortColumns(String[] cols)

Menetapkan kolom untuk mengurutkan kunci yang dilewatkan dari mapper ke reducer.

void setOutputGroupingColumns(String[] cols)

Menetapkan kolom untuk mengelompokkan kunci.

void setMapperClass(Class<? extends Mapper > theClass)

Menetapkan mapper untuk pekerjaan.

void setPartitionColumns(String[] cols)

Menetapkan kolom kunci partisi untuk pekerjaan. Secara default, kolom kunci partisi adalah semua kolom kunci yang dihasilkan oleh mapper.

void setReducerClass(Class<? extends Reducer > theClass)

Menetapkan reducer untuk pekerjaan.

void setCombinerClass(Class<? extends Reducer > theClass)

Menetapkan combiner untuk pekerjaan. Combiner menggabungkan rekaman dengan kunci yang sama. Ini mirip dengan reducer tetapi bekerja pada tahap map.

void setSplitSize(long size)

Menetapkan ukuran split, dalam MB. Ukuran split default adalah 256 MB.

void setNumReduceTasks(int n)

Menetapkan jumlah tugas reduce. Secara default, jumlah tugas reduce adalah seperempat dari jumlah tugas map.

void setMemoryForMapTask(int mem)

Menetapkan memori yang tersedia untuk worker dalam tugas map, dalam MB. Ukuran memori default adalah 2048 MB.

void setMemoryForReduceTask(int mem)

Menetapkan memori yang tersedia untuk worker dalam tugas reduce, dalam MB. Ukuran memori default adalah 2048 MB.

  • Kolom pengelompokan dipilih dari kolom sortir. Kolom sortir dan kolom kunci partisi harus ada dalam kunci.

  • Pada tahap map, nilai hash dari rekaman dari mapper dihitung berdasarkan kolom kunci partisi yang ditentukan. Nilai hash membantu menentukan reducer ke mana rekaman dilewatkan. Rekaman diurutkan berdasarkan kolom sortir sebelum rekaman dilewatkan ke reducer.

  • Pada tahap reduce, rekaman input dikelompokkan berdasarkan kolom pengelompokan. Kemudian, sekelompok rekaman yang memiliki kunci yang sama dilewatkan ke metode reduce sebagai satu input.

JobClient

Tabel berikut menggambarkan metode dari kelas JobClient.

Metode

Deskripsi

static RunningJob runJob(JobConf job)

Mengirimkan pekerjaan MapReduce dalam mode blocking (sinkron) dan menunggu hingga pekerjaan selesai sebelum mengembalikan.

static RunningJob submitJob(JobConf job)

Mengirimkan pekerjaan MapReduce dalam mode non-blocking dan mengembalikan objek RunningJob.

RunningJob

Tabel berikut menggambarkan metode dari kelas RunningJob.

Metode

Deskripsi

String getInstanceID()

Mendapatkan ID instans pekerjaan. Anda dapat menggunakan ID instans pekerjaan untuk melihat log operasional dan mengelola pekerjaan.

boolean isComplete()

Memeriksa apakah pekerjaan telah selesai.

boolean isSuccessful()

Memeriksa apakah instans pekerjaan berhasil.

void waitForCompletion()

Menunggu instans pekerjaan berakhir. Metode ini digunakan untuk pekerjaan yang dikirimkan dalam mode sinkron.

JobStatus getJobStatus()

Memeriksa status berjalan dari instans pekerjaan.

void killJob()

Mengakhiri pekerjaan saat ini.

Counters getCounters()

Mendapatkan informasi counter.

InputUtils

Tabel berikut menggambarkan metode dari kelas InputUtils.

Metode

Deskripsi

static void addTable(TableInfo table, JobConf conf)

Menambahkan tabel input ke tugas. Metode ini dapat dipanggil beberapa kali. Tabel baru ditambahkan ke antrian input.

static void setTables(TableInfo [] tables, JobConf conf)

Menambahkan beberapa tabel input ke tugas.

OutputUtils

Tabel berikut menggambarkan metode dari kelas OutputUtils.

Metode

Deskripsi

static void addTable(TableInfo table, JobConf conf)

Menambahkan tabel output ke tugas. Metode ini dapat dipanggil beberapa kali. Tabel baru ditambahkan ke antrian output.

static void setTables(TableInfo[] tables, JobConf conf)

Menambahkan beberapa tabel output ke tugas.

Pipeline

Pipeline adalah kelas utama dari model MapReduce yang diperluas. Anda dapat memanggil metode Pipeline.builder untuk membangun pipeline. Kode berikut menunjukkan metode dari kelas Pipeline:

    public Builder addMapper(Class<? extends Mapper> mapper)
    public Builder addMapper(Class<? extends Mapper> mapper,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder addReducer(Class<? extends Reducer> reducer)
    public Builder addReducer(Class<? extends Reducer> reducer,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder setOutputKeySchema(Column[] keySchema)
    public Builder setOutputValueSchema(Column[] valueSchema)
    public Builder setOutputKeySortColumns(String[] sortCols)
    public Builder setOutputKeySortOrder(SortOrder[] order)
    public Builder setPartitionColumns(String[] partCols)
    public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
    public Builder setOutputGroupingColumns(String[] cols)

Contoh berikut menunjukkan cara memanggil metode Pipeline.builder untuk membangun pipeline:

    Job job = new Job();
    Pipeline pipeline = Pipeline.builder()
     .addMapper(TokenizerMapper.class)
     .setOutputKeySchema(
         new Column[] { new Column("word", OdpsType.STRING) })
     .setOutputValueSchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .addReducer(SumReducer.class)
     .setOutputKeySchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .setOutputValueSchema(
         new Column[] { new Column("word", OdpsType.STRING),
         new Column("count", OdpsType.BIGINT) })
     .addReducer(IdentityReducer.class).createPipeline();
    job.setPipeline(pipeline);  
    job.addInput(...)
    job.addOutput(...)
    job.submit();

Seperti yang ditunjukkan dalam contoh sebelumnya, Anda dapat membuat pekerjaan MapReduce di mana mapper diikuti oleh dua reducer dalam fungsi utama. Jika Anda sudah familiar dengan fitur dasar MapReduce, model MapReduce yang diperluas mudah digunakan.

null
  • Sebelum menggunakan model MapReduce yang diperluas, kami sarankan Anda mempelajari cara menggunakan MapReduce.

  • Anda dapat membuat pekerjaan MapReduce di mana mapper diikuti hanya oleh satu reducer dengan menggunakan JobConf.