Topik ini menjelaskan cara menjalankan SecondarySort di MapReduce.
Prasyarat
Selesaikan konfigurasi lingkungan untuk pengujian, lihat Memulai.
Persiapan
Paket JAR program uji telah disiapkan. Dalam topik ini, paket JAR bernama mapreduce-examples.jar dan disimpan di direktori bin\data\resources pada jalur instalasi lokal MaxCompute.
Siapkan tabel uji dan sumber daya untuk SecondarySort.
Buat tabel uji.
CREATE TABLE ss_in(key BIGINT, value BIGINT); CREATE TABLE ss_out(key BIGINT, value BIGINT);Tambahkan sumber daya uji.
-- Saat menambahkan paket JAR untuk pertama kali, Anda dapat mengabaikan flag -f. add jar data\resources\mapreduce-examples.jar -f;
Gunakan Tunnel untuk mengimpor file
data.txtdari direktori bin klien MaxCompute ke tabelss_in.tunnel upload data.txt ss_in;Data berikut diimpor ke tabel ss_in:
1,2 2,1 1,1 2,2
Prosedur
Jalankan SecondarySort pada klien MaxCompute.
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.SecondarySort ss_in ss_out;Hasil yang Diharapkan
Pekerjaan berjalan normal. Data berikut dikembalikan di tabel ss_out:
+------------+------------+
| key | value |
+------------+------------+
| 1 | 1 |
| 1 | 2 |
| 2 | 1 |
| 2 | 2 |
+------------+------------+Kode Contoh
Untuk informasi tentang dependensi Project Object Model (POM), lihat Peringatan.
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.data.TableInfo;
/**
*
* Ini adalah contoh aplikasi ODPS Map/Reduce. Aplikasi membaca tabel input yang
* harus berisi dua bilangan bulat per rekaman. Output diurutkan berdasarkan angka pertama dan
* kedua serta dikelompokkan berdasarkan angka pertama.
*
**/
public class SecondarySort {
/**
* Baca dua bilangan bulat dari setiap baris dan hasilkan pasangan kunci, nilai sebagai ((kiri,
* kanan), kanan).
**/
public static class MapClass extends MapperBase {
private Record key;
private Record value;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
value = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
long left = 0;
long right = 0;
if (record.getColumnCount() > 0) {
left = (Long) record.get(0);
if (record.getColumnCount() > 1) {
right = (Long) record.get(1);
}
key.set(new Object[] { (Long) left, (Long) right });
value.set(new Object[] { (Long) right });
context.write(key, value);
}
}
}
/**
* Kelas reducer yang hanya mengeluarkan jumlah dari nilai input.
**/
public static class ReduceClass extends ReducerBase {
private Record result = null;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
result.set(0, key.get(0));
while (values.hasNext()) {
Record value = values.next();
result.set(1, value.get(0));
context.write(result);
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: secondarysrot <in> <out>");
System.exit(2);
}
JobConf job = new JobConf();
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
/** Tetapkan beberapa kolom sebagai kunci. */
//bandingkan bagian pertama dan kedua dari pasangan
job.setOutputKeySortColumns(new String[] { "i1", "i2" });
//partisi berdasarkan bagian pertama dari pasangan
job.setPartitionColumns(new String[] { "i1" });
//pengelompokan berdasarkan bagian pertama dari pasangan
job.setOutputGroupingColumns(new String[] { "i1" });
//output peta adalah LongPair, Long
job.setMapOutputKeySchema(SchemaUtils.fromString("i1:bigint,i2:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("i2x:bigint"));
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
System.exit(0);
}
}