Topik ini menjelaskan contoh penggunaan MultiJobs di MapReduce.
Prasyarat
Selesaikan konfigurasi lingkungan untuk pengujian, lihat Memulai.
Persiapan
Siapkan paket JAR program uji. Dalam topik ini, paket JAR diberi nama mapreduce-examples.jar dan disimpan di direktori bin\data\resources dalam jalur instalasi lokal MaxCompute.
Siapkan tabel uji dan sumber daya untuk MultiJobs.
Buat tabel uji.
CREATE TABLE mr_empty (key STRING, value STRING); CREATE TABLE mr_multijobs_out (value BIGINT);Tambahkan sumber daya uji.
add table mr_multijobs_out as multijobs_res_table -f; -- Saat menambahkan paket JAR untuk pertama kali, Anda dapat mengabaikan flag -f. add jar data\resources\mapreduce-examples.jar -f;
Prosedur
Jalankan MultiJobs pada klien MaxCompute.
jar -resources mapreduce-examples.jar,multijobs_res_table -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;Hasil yang Diharapkan
Pekerjaan berjalan normal. Data berikut dikembalikan di tabel mr_multijobs_out:
+------------+
| value |
+------------+
| 0 |
+------------+Kode Contoh
Untuk informasi tentang dependensi Project Object Model (POM), lihat Peringatan.
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
* MultiJobs
*
* Menjalankan beberapa pekerjaan
*
**/
public class MultiJobs {
public static class InitMapper extends MapperBase {
@Override
public void setup(TaskContext context) throws IOException {
Record record = context.createOutputRecord();
long v = context.getJobConf().getLong("multijobs.value", 2);
record.set(0, v);
context.write(record);
}
}
public static class DecreaseMapper extends MapperBase {
@Override
public void cleanup(TaskContext context) throws IOException {
/** Memperoleh nilai variabel yang didefinisikan di fungsi utama dari JobConf. */
long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
long v = -1;
int count = 0;
/** Membaca data dari tabel keluaran pekerjaan sebelumnya. */
Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
while (iter.hasNext()) {
Record r = iter.next();
v = (Long) r.get(0);
if (expect != v) {
throw new IOException("harap: " + expect + ", tetapi: " + v);
}
count++;
}
if (count != 1) {
throw new IOException("res_table harus memiliki 1 rekaman, tetapi: " + count);
}
Record record = context.createOutputRecord();
v--;
record.set(0, v);
context.write(record);
/** Mengatur penghitung. Nilai penghitung dapat diperoleh di fungsi utama setelah pekerjaan selesai. */
context.getCounter("multijobs", "value").setValue(v);
}
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Penggunaan: TestMultiJobs <tabel>");
System.exit(1);
}
String tbl = args[0];
long iterCount = 2;
System.err.println("Mulai menjalankan pekerjaan init.");
JobConf initJob = new JobConf();
initJob.setLong("multijobs.value", iterCount);
initJob.setMapperClass(InitMapper.class);
InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);
initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
/** Secara eksplisit atur jumlah reducer ke 0 untuk pekerjaan hanya-pemetaan. */
initJob.setNumReduceTasks(0);
JobClient.runJob(initJob);
while (true) {
System.err.println("Mulai menjalankan pekerjaan iter, hitungan: " + iterCount);
JobConf decJob = new JobConf();
decJob.setLong("multijobs.expect.value", iterCount);
decJob.setMapperClass(DecreaseMapper.class);
InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);
/** Secara eksplisit atur jumlah reducer ke 0 untuk pekerjaan hanya-pemetaan. */
decJob.setNumReduceTasks(0);
RunningJob rJob = JobClient.runJob(decJob);
iterCount--;
/** Jika jumlah iterasi yang ditentukan tercapai, keluar dari loop. */
if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
break;
}
}
if (iterCount != 0) {
throw new IOException("Pekerjaan gagal.");
}
}
}