Topik ini menjelaskan contoh penggunaan sumber daya dalam MapReduce untuk membantu Anda mengalokasikan dan mengelola sumber daya komputasi secara lebih efisien selama pemrosesan data besar, serta meningkatkan kinerja eksekusi tugas.
Prasyarat
Selesaikan konfigurasi lingkungan untuk pengujian. Lihat Memulai.
Persiapan
Siapkan paket JAR program uji. Dalam topik ini, paket JAR diberi nama mapreduce-examples.jar dan disimpan di direktori bin\data\resources pada jalur instalasi lokal MaxCompute.
Di klien MaxCompute, lakukan langkah-langkah berikut untuk menyiapkan tabel uji dan sumber daya:
Buat tabel uji.
CREATE TABLE mr_upload_src(key BIGINT, value STRING);Tambahkan sumber daya uji.
add file data\resources\import.txt -f; -- Saat menambahkan paket JAR untuk pertama kali, Anda dapat mengabaikan flag -f. add jar data\resources\mapreduce-examples.jar -f;File import.txt berisi konten berikut:
1000,odps
Prosedur
Jalankan kode berikut di klien MaxCompute untuk mengunggah data dari sumber daya uji ke tabel mr_upload_src:
jar -resources mapreduce-examples.jar,import.txt -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.Upload import.txt mr_upload_src;Hasil yang diharapkan
Tugas berjalan normal. Data berikut dikembalikan di tabel mr_upload_src:
+------------+------------+
| key | value |
+------------+------------+
| 1000 | odps |
+------------+------------+Kode sampel
Untuk informasi tentang dependensi Project Object Model (POM), lihat Peringatan.
Kode sampel:
package com.aliyun.odps.mapred.open.example;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
* Upload
* Impor data dari file teks ke tabel
**/
public class Upload {
public static class UploadMapper extends MapperBase {
@Override
public void setup(TaskContext context) throws IOException {
Record record = context.createOutputRecord();
StringBuilder importdata = new StringBuilder();
BufferedInputStream bufferedInput = null;
try {
byte[] buffer = new byte[1024];
int bytesRead = 0;
String filename = context.getJobConf().get("import.filename");
bufferedInput = context.readResourceFileAsStream(filename);
while ((bytesRead = bufferedInput.read(buffer)) != -1) {
String chunk = new String(buffer, 0, bytesRead);
importdata.append(chunk);
}
String lines[] = importdata.toString().split("\n");
for (int i = 0; i < lines.length; i++) {
String[] ss = lines[i].split(",");
record.set(0, Long.parseLong(ss[0].trim()));
record.set(1, ss[1].trim());
context.write(record);
}
} catch (FileNotFoundException ex) {
throw new IOException(ex);
} catch (IOException ex) {
throw new IOException(ex);
} finally {
}
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Upload <import_txt> <out_table>");
System.exit(2);
}
JobConf job = new JobConf();
job.setMapperClass(UploadMapper.class);
/** Tentukan nama sumber daya, yang dapat diperoleh di tahap map menggunakan antarmuka JobConf. */
job.set("import.filename", args[0]);
/** Secara eksplisit atur jumlah reducer menjadi 0 untuk pekerjaan hanya-map. */
job.setNumReduceTasks(0);
job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
}
}Anda dapat menggunakan salah satu metode berikut untuk mengatur file konfigurasi JobConf:
Gunakan antarmuka JobConf di SDK. Metode ini digunakan dalam contoh sebelumnya.
Gunakan parameter -conf dalam perintah jar untuk menentukan file konfigurasi JobConf baru.