Bagian ini menjelaskan kelas dan metode umum dalam MapReduce.
Jika Anda menggunakan Maven, Anda dapat mencari odps-sdk-mapred di repositori Maven untuk menemukan versi terbaru dari SDK untuk Java. Anda dapat mendeklarasikan SDK dalam proyek Anda dengan menggunakan dependensi Maven berikut:
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.40.10-public</version>
</dependency>Tipe Data
Tipe data yang didukung oleh MaxCompute MapReduce meliputi BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, dan DECIMAL. Tabel berikut menggambarkan pemetaan antara tipe data MaxCompute dan Java.
Tipe data MaxCompute | Tipe data Java |
BIGINT | LONG |
STRING | STRING |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | DATE |
DECIMAL | BIGDECIMAL |
Kelas MapReduce
Kelas | Deskripsi |
MapperBase | Kelas dasar yang harus diwarisi oleh kelas mapper yang ditentukan pengguna. Mapper mengonversi rekaman dalam tabel input menjadi pasangan kunci-nilai dan meneruskan pasangan kunci-nilai tersebut ke reducer. Sebagai alternatif, mapper dapat menulis pasangan kunci-nilai ke tabel hasil dengan melewati tahap reduce. Pekerjaan yang melewati tahap reduce dan langsung mengembalikan hasil komputasi disebut pekerjaan MapOnly. |
ReducerBase | Kelas dasar yang harus diwarisi oleh kelas reducer yang ditentukan pengguna. Reducer mereduksi satu set nilai yang terkait dengan suatu kunci. |
TaskContext | Menggambarkan konteks suatu tugas. Konteks tugas adalah parameter masukan dari beberapa metode anggota MapperBase dan ReducerBase. |
JobClient | Mendefinisikan klien pekerjaan. Klien pekerjaan mengirimkan dan mengelola pekerjaan. Klien pekerjaan dapat mengirimkan pekerjaan dalam mode blocking atau non-blocking. Mode blocking adalah mode sinkron, sedangkan mode non-blocking adalah mode asinkron. |
RunningJob | Mendefinisikan pekerjaan yang sedang berjalan. Objek dari kelas ini digunakan untuk melacak instans pekerjaan MapReduce yang sedang berjalan. |
JobConf | Menggambarkan konfigurasi pekerjaan MapReduce. Objek JobConf didefinisikan dalam fungsi utama. Kemudian, klien pekerjaan mengirimkan pekerjaan ke MaxCompute berdasarkan objek JobConf. |
MapperBase
Tabel berikut menggambarkan metode dari kelas MapperBase.
Metode | Deskripsi |
void cleanup(TaskContext context) | Metode yang dipanggil setelah metode map pada akhir tahap map. |
void map(long key, Record record, TaskContext context) | Memproses rekaman dalam tabel input. |
void setup(TaskContext context) | Metode yang dipanggil sebelum metode map pada awal tahap map. |
ReducerBase
Tabel berikut menggambarkan metode dari kelas ReducerBase.
Metode | Deskripsi |
void cleanup( TaskContext context) | Metode yang dipanggil setelah metode reduce pada akhir tahap reduce. |
void reduce(Record key, Iterator<Record > values, TaskContext context) | Memproses rekaman dalam tabel input. |
void setup( TaskContext context) | Metode yang dipanggil sebelum metode reduce pada awal tahap reduce. |
TaskContext
Tabel berikut menggambarkan metode dari kelas TaskContext.
Metode | Deskripsi |
TableInfo[] getOutputTableInfo() | Mendapatkan informasi tentang tabel output. |
Record createOutputRecord() | Membuat rekaman untuk tabel output default. |
Record createOutputRecord(String label) | Membuat rekaman untuk tabel output dengan label tertentu. |
Record createMapOutputKeyRecord() | Membuat rekaman untuk kunci dalam pasangan kunci-nilai yang dihasilkan pada tahap map. |
Record createMapOutputValueRecord() | Membuat rekaman untuk nilai dalam pasangan kunci-nilai yang dihasilkan pada tahap map. |
void write(Record record) | Menulis rekaman ke tabel output default. Metode ini dapat dipanggil beberapa kali pada tahap reduce. |
void write(Record record, String label) | Menulis rekaman ke tabel output dengan label tertentu. Metode ini dapat dipanggil beberapa kali pada tahap reduce. |
void write(Record key, Record value) | Mengonversi rekaman menjadi pasangan kunci-nilai. Metode ini dapat dipanggil beberapa kali pada tahap map. |
BufferedInputStream readResourceFileAsStream(String resourceName) | Membaca sumber daya file. |
Iterator<Record > readResourceTable(String resourceName) | Membaca sumber daya tabel. |
Counter getCounter(Enum<? > name) | Mendapatkan counter dengan nama tertentu. |
Counter getCounter(String group, String name) | Mendapatkan counter dengan nama tertentu dalam grup tertentu. |
void progress() | Mengirimkan informasi denyut jantung ke kerangka kerja MapReduce. Jika tugas Anda membutuhkan waktu lama untuk memproses data dan Anda tidak perlu memanggil kerangka kerja selama periode waktu ini, Anda dapat memanggil metode ini untuk menghindari timeout tugas. Periode timeout default untuk tugas adalah 600 detik. |
Jika worker berjalan dalam waktu lama dan kerangka kerja menentukan bahwa worker timeout, kerangka kerja akan menghentikan worker. Dalam hal ini, Anda dapat memanggil metode progress dari kelas TaskContext untuk mencegah worker dihentikan oleh MapReduce. Metode progress mengirimkan informasi denyut jantung ke kerangka kerja. Metode progress tidak digunakan untuk melaporkan kemajuan worker.
Periode timeout default untuk worker adalah 10 menit dalam MaxCompute MapReduce. Anda tidak dapat mengubah periode timeout. Jika worker tidak mengirimkan informasi denyut jantung dengan memanggil metode progress dalam 10 menit, kerangka kerja akan menghentikan worker dan tugas map atau reduce gagal. Oleh karena itu, disarankan untuk secara berkala memanggil metode progress dalam tugas map atau reduce untuk mencegah kerangka kerja menghentikan worker secara tak terduga.
JobConf
Tabel berikut menggambarkan metode dari kelas JobConf.
Metode | Deskripsi |
void setResources(String resourceNames) | Mendeklarasikan sumber daya yang digunakan dalam pekerjaan saat ini. Mapper atau reducer hanya dapat membaca sumber daya yang telah dideklarasikan dalam objek TaskContext. |
void setMapOutputKeySchema(Column[] schema) | Menetapkan atribut kunci yang dilewatkan dari mapper ke reducer. |
void setMapOutputValueSchema(Column[] schema) | Menetapkan atribut nilai yang dilewatkan dari mapper ke reducer. |
void setOutputKeySortColumns(String[] cols) | Menetapkan kolom untuk mengurutkan kunci yang dilewatkan dari mapper ke reducer. |
void setOutputGroupingColumns(String[] cols) | Menetapkan kolom untuk mengelompokkan kunci. |
void setMapperClass(Class<? extends Mapper > theClass) | Menetapkan mapper untuk pekerjaan. |
void setPartitionColumns(String[] cols) | Menetapkan kolom kunci partisi untuk pekerjaan. Secara default, kolom kunci partisi adalah semua kolom kunci yang dihasilkan oleh mapper. |
void setReducerClass(Class<? extends Reducer > theClass) | Menetapkan reducer untuk pekerjaan. |
void setCombinerClass(Class<? extends Reducer > theClass) | Menetapkan combiner untuk pekerjaan. Combiner menggabungkan rekaman dengan kunci yang sama. Ini mirip dengan reducer tetapi bekerja pada tahap map. |
void setSplitSize(long size) | Menetapkan ukuran split, dalam MB. Ukuran split default adalah 256 MB. |
void setNumReduceTasks(int n) | Menetapkan jumlah tugas reduce. Secara default, jumlah tugas reduce adalah seperempat dari jumlah tugas map. |
void setMemoryForMapTask(int mem) | Menetapkan memori yang tersedia untuk worker dalam tugas map, dalam MB. Ukuran memori default adalah 2048 MB. |
void setMemoryForReduceTask(int mem) | Menetapkan memori yang tersedia untuk worker dalam tugas reduce, dalam MB. Ukuran memori default adalah 2048 MB. |
Kolom pengelompokan dipilih dari kolom sortir. Kolom sortir dan kolom kunci partisi harus ada dalam kunci.
Pada tahap map, nilai hash dari rekaman dari mapper dihitung berdasarkan kolom kunci partisi yang ditentukan. Nilai hash membantu menentukan reducer ke mana rekaman dilewatkan. Rekaman diurutkan berdasarkan kolom sortir sebelum rekaman dilewatkan ke reducer.
Pada tahap reduce, rekaman input dikelompokkan berdasarkan kolom pengelompokan. Kemudian, sekelompok rekaman yang memiliki kunci yang sama dilewatkan ke metode reduce sebagai satu input.
JobClient
Tabel berikut menggambarkan metode dari kelas JobClient.
Metode | Deskripsi |
static RunningJob runJob(JobConf job) | Mengirimkan pekerjaan MapReduce dalam mode blocking (sinkron) dan menunggu hingga pekerjaan selesai sebelum mengembalikan. |
static RunningJob submitJob(JobConf job) | Mengirimkan pekerjaan MapReduce dalam mode non-blocking dan mengembalikan objek RunningJob. |
RunningJob
Tabel berikut menggambarkan metode dari kelas RunningJob.
Metode | Deskripsi |
String getInstanceID() | Mendapatkan ID instans pekerjaan. Anda dapat menggunakan ID instans pekerjaan untuk melihat log operasional dan mengelola pekerjaan. |
boolean isComplete() | Memeriksa apakah pekerjaan telah selesai. |
boolean isSuccessful() | Memeriksa apakah instans pekerjaan berhasil. |
void waitForCompletion() | Menunggu instans pekerjaan berakhir. Metode ini digunakan untuk pekerjaan yang dikirimkan dalam mode sinkron. |
JobStatus getJobStatus() | Memeriksa status berjalan dari instans pekerjaan. |
void killJob() | Mengakhiri pekerjaan saat ini. |
Counters getCounters() | Mendapatkan informasi counter. |
InputUtils
Tabel berikut menggambarkan metode dari kelas InputUtils.
Metode | Deskripsi |
static void addTable(TableInfo table, JobConf conf) | Menambahkan tabel input ke tugas. Metode ini dapat dipanggil beberapa kali. Tabel baru ditambahkan ke antrian input. |
static void setTables(TableInfo [] tables, JobConf conf) | Menambahkan beberapa tabel input ke tugas. |
OutputUtils
Tabel berikut menggambarkan metode dari kelas OutputUtils.
Metode | Deskripsi |
static void addTable(TableInfo table, JobConf conf) | Menambahkan tabel output ke tugas. Metode ini dapat dipanggil beberapa kali. Tabel baru ditambahkan ke antrian output. |
static void setTables(TableInfo[] tables, JobConf conf) | Menambahkan beberapa tabel output ke tugas. |
Pipeline
Pipeline adalah kelas utama dari model MapReduce yang diperluas. Anda dapat memanggil metode Pipeline.builder untuk membangun pipeline. Kode berikut menunjukkan metode dari kelas Pipeline:
public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder setOutputKeySchema(Column[] keySchema)
public Builder setOutputValueSchema(Column[] valueSchema)
public Builder setOutputKeySortColumns(String[] sortCols)
public Builder setOutputKeySortOrder(SortOrder[] order)
public Builder setPartitionColumns(String[] partCols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
public Builder setOutputGroupingColumns(String[] cols)Contoh berikut menunjukkan cara memanggil metode Pipeline.builder untuk membangun pipeline:
Job job = new Job();
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.addReducer(SumReducer.class)
.setOutputKeySchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.setOutputValueSchema(
new Column[] { new Column("word", OdpsType.STRING),
new Column("count", OdpsType.BIGINT) })
.addReducer(IdentityReducer.class).createPipeline();
job.setPipeline(pipeline);
job.addInput(...)
job.addOutput(...)
job.submit();Seperti yang ditunjukkan dalam contoh sebelumnya, Anda dapat membuat pekerjaan MapReduce di mana mapper diikuti oleh dua reducer dalam fungsi utama. Jika Anda sudah familiar dengan fitur dasar MapReduce, model MapReduce yang diperluas mudah digunakan.
Sebelum menggunakan model MapReduce yang diperluas, kami sarankan Anda mempelajari cara menggunakan MapReduce.
Anda dapat membuat pekerjaan MapReduce di mana mapper diikuti hanya oleh satu reducer dengan menggunakan JobConf.