全部产品
Search
文档中心

MaxCompute:Contoh MultiJobs

更新时间:Jun 19, 2025

Topik ini menjelaskan contoh penggunaan MultiJobs di MapReduce.

Prasyarat

Selesaikan konfigurasi lingkungan untuk pengujian, lihat Memulai.

Persiapan

  1. Siapkan paket JAR program uji. Dalam topik ini, paket JAR diberi nama mapreduce-examples.jar dan disimpan di direktori bin\data\resources dalam jalur instalasi lokal MaxCompute.

  2. Siapkan tabel uji dan sumber daya untuk MultiJobs.

    1. Buat tabel uji.

      CREATE TABLE mr_empty (key STRING, value STRING);
      CREATE TABLE mr_multijobs_out (value BIGINT);
    2. Tambahkan sumber daya uji.

      add table mr_multijobs_out as multijobs_res_table -f;
      
      -- Saat menambahkan paket JAR untuk pertama kali, Anda dapat mengabaikan flag -f.
      add jar data\resources\mapreduce-examples.jar -f;

Prosedur

Jalankan MultiJobs pada klien MaxCompute.

jar -resources mapreduce-examples.jar,multijobs_res_table -classpath data\resources\mapreduce-examples.jar 
 com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;

Hasil yang Diharapkan

Pekerjaan berjalan normal. Data berikut dikembalikan di tabel mr_multijobs_out:

+------------+
| value      |
+------------+
| 0          |
+------------+

Kode Contoh

Untuk informasi tentang dependensi Project Object Model (POM), lihat Peringatan.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
     * MultiJobs
     *
     * Menjalankan beberapa pekerjaan
     *
     **/
public class MultiJobs {
    public static class InitMapper extends MapperBase {
        @Override
            public void setup(TaskContext context) throws IOException {
            Record record = context.createOutputRecord();
            long v = context.getJobConf().getLong("multijobs.value", 2);
            record.set(0, v);
            context.write(record);
        }
    }
    public static class DecreaseMapper extends MapperBase {
        @Override
            public void cleanup(TaskContext context) throws IOException {
            /** Memperoleh nilai variabel yang didefinisikan di fungsi utama dari JobConf. */
            long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
            long v = -1;
            int count = 0;
            /** Membaca data dari tabel keluaran pekerjaan sebelumnya. */
            Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
            while (iter.hasNext()) {
                Record r = iter.next();
                v = (Long) r.get(0);
                if (expect != v) {
                    throw new IOException("harap: " + expect + ", tetapi: " + v);
                }
                count++;
            }
            if (count != 1) {
                throw new IOException("res_table harus memiliki 1 rekaman, tetapi: " + count);
            }
            Record record = context.createOutputRecord();
            v--;
            record.set(0, v);
            context.write(record);
            /** Mengatur penghitung. Nilai penghitung dapat diperoleh di fungsi utama setelah pekerjaan selesai. */
            context.getCounter("multijobs", "value").setValue(v);
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("Penggunaan: TestMultiJobs <tabel>");
            System.exit(1);
        }
        String tbl = args[0];
        long iterCount = 2;
        System.err.println("Mulai menjalankan pekerjaan init.");
        JobConf initJob = new JobConf();
        initJob.setLong("multijobs.value", iterCount);
        initJob.setMapperClass(InitMapper.class);
        InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
        OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);
        initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
        initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
        /** Secara eksplisit atur jumlah reducer ke 0 untuk pekerjaan hanya-pemetaan. */
        initJob.setNumReduceTasks(0);
        JobClient.runJob(initJob);
        while (true) {
            System.err.println("Mulai menjalankan pekerjaan iter, hitungan: " + iterCount);
            JobConf decJob = new JobConf();
            decJob.setLong("multijobs.expect.value", iterCount);
            decJob.setMapperClass(DecreaseMapper.class);
            InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
            OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);
            /** Secara eksplisit atur jumlah reducer ke 0 untuk pekerjaan hanya-pemetaan. */
            decJob.setNumReduceTasks(0);
            RunningJob rJob = JobClient.runJob(decJob);
            iterCount--;
            /** Jika jumlah iterasi yang ditentukan tercapai, keluar dari loop. */
            if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
                break;
            }
        }
        if (iterCount != 0) {
            throw new IOException("Pekerjaan gagal.");
        }
    }
}