全部产品
Search
文档中心

DataWorks:EMR MR node

更新时间:Oct 28, 2025

Anda dapat membuat node EMR MR E-MapReduce (EMR) untuk memproses set data besar menggunakan beberapa tugas map paralel. Node EMR MR membantu meningkatkan efisiensi pemrosesan data secara signifikan. Topik ini menjelaskan cara mengembangkan dan mengonfigurasi node EMR MR. Dalam contoh ini, node digunakan untuk membaca data dari objek Object Storage Service (OSS) di bucket OSS dan menghitung jumlah kata dalam objek tersebut.

Prasyarat

  • Anda telah membuat kluster EMR Alibaba Cloud dan menambahkannya ke DataWorks. Untuk informasi lebih lanjut, lihat Pengembangan Data (Baru): Menyambungkan sumber daya komputasi EMR.

  • (Opsional) Jika Anda adalah Pengguna Resource Access Management (RAM), pastikan Anda telah ditambahkan ke ruang kerja untuk pengembangan tugas dan diberi peran Developer atau Workspace Administrator. Peran Administrator Ruang Kerja memiliki izin yang luas. Berikan peran ini dengan hati-hati. Untuk informasi lebih lanjut tentang menambahkan anggota, lihat Tambahkan anggota ke ruang kerja.

    Jika Anda menggunakan Akun Alibaba Cloud, Anda dapat melewati langkah ini.
  • Kode sumber terbuka diunggah sebagai sumber daya atau fungsi yang ditentukan pengguna (UDF) diunggah sebagai sumber daya di panel RESOURCE MANAGEMENT: ALL. Prasyarat ini harus dipenuhi jika Anda ingin mereferensikan kode sumber terbuka atau UDF dalam node EMR MR Anda. Untuk informasi lebih lanjut, lihat Manajemen Sumber Daya.

  • Bucket OSS dibuat. Untuk menggunakan kode sampel untuk pengembangan tugas dalam topik ini, Anda harus menyiapkan bucket OSS. Untuk informasi lebih lanjut tentang cara membuat bucket OSS, lihat Buat bucket.

  • Node EMR MR dibuat. Untuk informasi lebih lanjut, lihat Buat node untuk alur kerja yang dipicu otomatis.

Batasan

  • Jenis node ini hanya dapat dijalankan pada grup sumber daya tanpa server atau grup sumber daya eksklusif untuk penjadwalan. Kami merekomendasikan Anda menggunakan grup sumber daya tanpa server.

  • Jika Anda ingin mengelola metadata untuk DataLake atau kluster kustom di DataWorks, Anda harus mengonfigurasi EMR-HOOK di kluster terlebih dahulu. Untuk informasi lebih lanjut tentang cara mengonfigurasi EMR-HOOK, lihat Gunakan fitur ekstensi Hive untuk mencatat alur data dan informasi akses historis.

    Catatan

    Jika Anda tidak mengonfigurasi EMR-HOOK di kluster Anda, metadata tidak dapat ditampilkan secara real-time, log audit tidak dapat dihasilkan, dan alur data tidak dapat ditampilkan di DataWorks. Tugas tata kelola EMR juga tidak dapat dijalankan.

Siapkan data awal dan paket sumber daya JAR

Siapkan data awal

Buat file bernama input01.txt yang berisi data awal berikut:

hadoop emr hadoop dw
hive hadoop
dw emr

Unggah file yang menyimpan data awal

  1. Masuk ke Konsol OSS. Di bilah navigasi kiri, klik Buckets.

  2. Di halaman Bucket, temukan bucket yang diinginkan dan klik nama bucket untuk masuk ke halaman Objects.

    Dalam contoh ini, bucket onaliyun-bucket-2 digunakan.

  3. Di halaman Objek, klik Create Directory untuk membuat direktori yang digunakan untuk menyimpan data awal dan sumber daya JAR.

    • Atur Directory Name menjadi emr/datas/wordcount02/inputs untuk membuat direktori yang digunakan untuk menyimpan data awal.

    • Atur Directory Name menjadi emr/jars untuk membuat direktori yang digunakan untuk menyimpan sumber daya JAR.

  4. Unggah file yang menyimpan data awal ke direktori emr/datas/wordcount02/inputs.

    • Pergi ke direktori /emr/datas/wordcount02/inputs dan klik Upload Object.

    • Di bagian Files to Upload, klik Select Files dan unggah file input01.txt ke bucket.

Gunakan node EMR MR untuk membaca objek OSS dan menghasilkan paket JAR

  1. Buka proyek IntelliJ IDEA yang ada dan tambahkan dependensi Project Object Model (POM).

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!--Versi yang digunakan oleh EMR MR adalah 2.8.5.-->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. Konfigurasikan parameter berikut untuk membaca data dari dan menulis data ke objek OSS.

    Penting

    Akun Alibaba Cloud memiliki izin untuk memanggil semua operasi API. Jika pasangan Kunci Akses akun Alibaba Cloud Anda bocor, semua sumber daya di akun Alibaba Cloud Anda mungkin terpapar risiko keamanan tinggi. Kami merekomendasikan agar Anda tidak menyimpan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda ke dalam kode proyek atau posisi yang mudah ditemukan. Kami merekomendasikan agar Anda menggunakan Pengguna RAM untuk memanggil operasi API atau melakukan O&M rutin. Contoh kode berikut hanya disediakan untuk referensi. Simpan pasangan Kunci Akses akun Alibaba Cloud Anda tetap rahasia.

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    Deskripsi parameter:

    • ${accessKeyId}: ID AccessKey akun Alibaba Cloud Anda.

    • ${accessKeySecret}: Rahasia AccessKey akun Alibaba Cloud Anda.

    • ${endpoint}: titik akhir OSS. Titik akhir ditentukan oleh wilayah tempat kluster EMR Anda berada. Anda harus mengaktifkan OSS di wilayah tempat kluster EMR Anda berada. Untuk informasi lebih lanjut, lihat Wilayah dan titik akhir.

    Dalam topik ini, kode Java digunakan untuk memodifikasi contoh WordCount di situs resmi Hadoop. Konfigurasi ID AccessKey dan Rahasia AccessKey ditambahkan ke kode. Ini memberikan izin kepada pekerjaan untuk mengakses objek OSS.

    Contoh kode

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ?  0 : 1);
        }
    }
                                    
  3. Setelah Anda menulis kode sebelumnya, kompres kode tersebut menjadi paket JAR. Dalam contoh ini, paket bernama onaliyun_mr_wordcount-1.0-SNAPSHOT.jar dihasilkan.

Prosedur

  1. Di tab konfigurasi node EMR MR, lakukan operasi berikut:

    Kembangkan tugas EMR MR

    Anda dapat menggunakan salah satu metode berikut berdasarkan kebutuhan bisnis Anda untuk mengembangkan tugas EMR MR:

    Metode 1: Unggah dan referensikan sumber daya EMR JAR

    DataWorks memungkinkan Anda mengunggah sumber daya dari mesin lokal Anda ke Data Studio sebelum Anda dapat mereferensikan sumber daya tersebut. Jika node EMR MR bergantung pada banyak sumber daya, sumber daya tersebut tidak dapat diunggah menggunakan konsol DataWorks. Dalam kasus ini, Anda dapat menyimpan sumber daya di Sistem File Terdistribusi Hadoop (HDFS) dan mereferensikan sumber daya dalam kode node EMR MR.

    1. Buat sumber daya EMR JAR.

      1. Untuk informasi lebih lanjut tentang cara membuat sumber daya EMR JAR, lihat Manajemen Sumber Daya. Dalam contoh ini, paket JAR yang dihasilkan di bagian Siapkan data awal dan paket sumber daya JAR dalam topik ini disimpan di direktori emr/jars. Klik Upload untuk mengunggah paket JAR.

      2. Konfigurasikan parameter Storage Path, Data Source, dan Resource Group.

      3. Klik Save.

      image

    2. Referensikan sumber daya EMR JAR.

      1. Buka node EMR MR. Tab konfigurasi node muncul.

      2. Temukan sumber daya yang ingin direferensikan di panel RESOURCE MANAGEMENT: ALL di bilah navigasi kiri halaman Data Studio, klik kanan nama sumber daya, dan pilih Reference Resources. Dalam contoh ini, sumber daya adalah onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.

      3. Jika informasi dalam format ##@resource_reference{""} muncul di tab konfigurasi node EMR MR, sumber daya kode direferensikan. Kemudian, jalankan kode berikut. Anda harus mengganti informasi dalam kode berikut dengan informasi aktual. Informasi tersebut mencakup nama paket sumber daya, nama bucket, dan jalur.

        ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
        onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
        Catatan

        Anda tidak dapat menambahkan komentar saat menulis kode untuk node EMR MR.

    Metode 2: Referensikan sumber daya OSS

    Node saat ini dapat mereferensikan sumber daya OSS menggunakan metode OSS REF. Saat Anda menjalankan node, DataWorks secara otomatis memuat sumber daya OSS yang ditentukan dalam kode node. Metode ini umumnya digunakan dalam skenario di mana dependensi JAR diperlukan dalam tugas EMR atau tugas EMR perlu bergantung pada skrip.

    1. Unggah file JAR.

      1. Masuk ke Konsol OSS. Di bilah navigasi atas, pilih wilayah yang diinginkan. Kemudian, di bilah navigasi kiri, klik Buckets.

      2. Di halaman Bucket, temukan bucket yang diinginkan dan klik nama bucket untuk masuk ke halaman Objects.

        Dalam contoh ini, bucket onaliyun-bucket-2 digunakan.

      3. Unggah file JAR ke direktori yang dibuat.

        Pergi ke direktori emr/jars. Klik Upload Object. Di bagian Files to Upload, klik Select Files dan tambahkan file onaliyun_mr_wordcount-1.0-SNAPSHOT.jar. Kemudian, klik Upload Object.

    2. Referensikan file JAR.

      Tulis kode yang digunakan untuk mereferensikan file JAR.

      Di tab konfigurasi node EMR MR, tulis kode yang digunakan untuk mereferensikan file JAR.

      hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      Catatan

      Perintah sebelumnya dalam format berikut: hadoop jar <Jalur tempat file JAR yang akan direferensikan dan dijalankan disimpan> <Nama lengkap kelas utama yang akan dijalankan> <Jalur tempat file yang akan dibaca disimpan> <Jalur tempat hasil disimpan>.

      Deskripsi parameter:

      Parameter

      Deskripsi

      Jalur tempat file JAR yang akan direferensikan dan dijalankan disimpan

      Jalur dalam format ossref://{endpoint}/{bucket}/{object}.

      • endpoint: titik akhir OSS. Jika parameter endpoint dibiarkan kosong, hanya sumber daya di bucket OSS yang berada di wilayah yang sama dengan kluster EMR saat ini yang dapat direferensikan.

      • bucket: wadah yang digunakan untuk menyimpan objek di OSS. Setiap bucket memiliki nama unik. Anda dapat masuk ke Konsol OSS untuk melihat semua buckets dalam akun login saat ini.

      • object: nama file atau jalur yang disimpan di bucket.

    (Opsional) Konfigurasikan parameter lanjutan

    Anda dapat mengonfigurasi parameter yang spesifik untuk tabel berikut di DataWorks Parameters di bawah EMR Node Parameters di tab Properties di sisi kanan node.

    Catatan
    • Tabel berikut menjelaskan parameter lanjutan yang dapat dikonfigurasi untuk berbagai jenis kluster EMR.

    • Untuk parameter Spark sumber terbuka tambahan open-source Spark parameters, konfigurasikan mereka di Spark Parameters di bawah EMR Node Parameters di tab Properties di sisi kanan node.

    Kluster DataLake atau kluster kustom: dibuat di halaman EMR on ECS

    Parameter lanjutan

    Deskripsi

    queue

    Antrian penjadwalan tempat pekerjaan dikomit. Nilai default: default. Untuk informasi tentang EMR YARN, lihat Penjadwal YARN.

    priority

    Prioritas. Nilai default: 1.

    FLOW_SKIP_SQL_ANALYZE

    Cara pernyataan SQL dieksekusi. Nilai valid:

    • true: Beberapa pernyataan SQL dieksekusi sekaligus.

    • false (default): Hanya satu pernyataan SQL yang dieksekusi pada satu waktu.

    Catatan

    Parameter ini hanya tersedia untuk pengujian di lingkungan pengembangan ruang kerja DataWorks.

    Lainnya

    Anda dapat menambahkan parameter kustom untuk node EMR MR sebagai parameter lanjutan dalam pengaturan lanjutan di konsol DataWorks. Saat Anda mengirim kode untuk node EMR MR di DataWorks, DataWorks secara otomatis menambahkan parameter kustom ke perintah dalam format -D key=value.

    Kluster Hadoop: dibuat di halaman EMR on ECS

    Parameter lanjutan

    Deskripsi

    queue

    Antrian penjadwalan tempat pekerjaan dikomit. Nilai default: default. Untuk informasi tentang EMR YARN, lihat Penjadwal YARN.

    priority

    Prioritas. Nilai default: 1.

    USE_GATEWAY

    Menentukan apakah akan menggunakan kluster gateway untuk mengirim pekerjaan di node saat ini. Nilai valid:

    • true: Gunakan kluster gateway untuk mengirim pekerjaan.

    • false (default): Jangan gunakan kluster gateway untuk mengirim pekerjaan. Pekerjaan secara otomatis dikirim ke node master.

    Catatan

    Jika kluster EMR yang dimiliki oleh node tidak terhubung dengan kluster gateway tetapi parameter USE_GATEWAY disetel ke true, pekerjaan mungkin gagal dikirim.

    Eksekusi pernyataan SQL

    1. Pada tab Debugging Configurations di bilah navigasi kanan tab konfigurasi node, atur parameter Computing Resource di bagian Computing Resource, serta konfigurasikan parameter Resource Group di bagian Konfigurasi DataWorks.

      Catatan
      • Anda juga dapat mengonfigurasi parameter CUs For Computing berdasarkan sumber daya yang diperlukan untuk eksekusi tugas. Nilai default parameter ini adalah 0.25.

      • Jika Anda ingin mengakses sumber data melalui Internet atau virtual private cloud (VPC), Anda harus menggunakan grup sumber daya untuk penjadwalan yang terhubung ke sumber data. Untuk informasi lebih lanjut, lihat Solusi konektivitas jaringan.

    2. Di bilah alat atas tab konfigurasi node, klik Run untuk mengeksekusi pernyataan SQL.

  2. Jika Anda ingin menjalankan tugas pada node secara berkala, konfigurasikan informasi penjadwalan sesuai dengan kebutuhan bisnis Anda.

  3. Setelah Anda mengonfigurasi node, sebarkan node tersebut. Untuk informasi lebih lanjut, lihat Penyebaran node atau alur kerja.

  4. Setelah Anda menyebarkan node, lihat status node di Pusat Operasi. Untuk informasi lebih lanjut, lihat Memulai dengan Pusat Operasi.

Lihat hasilnya

  • Masuk ke Konsol OSS. Kemudian, Anda dapat melihat hasilnya di direktori emr/datas/wordcount02/outputs tempat data awal disimpan.目标Bucket

  • Lihat hasil statistik di konsol DataWorks.

    1. Buat node EMR Hive. Untuk informasi lebih lanjut, lihat Pengembangan node.

    2. Di node EMR Hive, buat tabel eksternal Hive yang dipasang ke OSS. Kemudian, gunakan tabel eksternal Hive untuk membaca data dari tabel Hive di OSS. Contoh kode:

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT 'Kata',
          `count` STRING COMMENT 'Jumlah'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      Gambar berikut menunjukkan hasilnya.运行结果