全部产品
Search
文档中心

DataWorks:EMR MR Node

更新时间:Feb 05, 2026

Tutorial ini menjelaskan cara mengembangkan dan mengonfigurasi pekerjaan MapReduce (MR) pada Node E-MapReduce (EMR), menggunakan contoh penghitungan kata yang membaca file teks dari Object Storage Service (OSS). Dengan membuat Node MR, Anda dapat membagi dataset besar menjadi beberapa tugas Map paralel untuk meningkatkan efisiensi pemrosesan data secara signifikan.

Prasyarat

  • Anda telah membuat Kluster Alibaba Cloud E-MapReduce (EMR) dan mendaftarkannya ke DataWorks. Untuk informasi selengkapnya, lihat Data Studio: Mengaitkan resource komputasi EMR.

  • (Opsional, wajib bagi pengguna RAM) Tambahkan pengguna Resource Access Management (RAM) yang bertanggung jawab atas pengembangan tugas ke Workspace dan berikan peran Developer atau Workspace Administrator. Peran Workspace Administrator memiliki izin yang luas, sehingga berikan dengan hati-hati. Untuk informasi selengkapnya tentang penambahan anggota, lihat Menambahkan anggota ke workspace.

    Jika Anda menggunakan akun Alibaba Cloud, Anda dapat melewati langkah ini.
  • Untuk mengikuti contoh dalam topik ini, Anda harus terlebih dahulu membuat Bucket OSS. Untuk informasi selengkapnya, lihat Membuat bucket.

Batasan

  • Jenis tugas ini hanya dapat dijalankan pada Serverless Resource Group (disarankan) atau Exclusive Scheduling Resource Group.

  • Untuk mengelola Metadata di DataWorks untuk DataLake atau kluster kustom, Anda harus terlebih dahulu mengonfigurasi EMR-HOOK pada kluster tersebut. Untuk informasi selengkapnya, lihat Mengonfigurasi Hive EMR-HOOK.

    Catatan

    Jika EMR-HOOK tidak dikonfigurasi pada kluster, DataWorks tidak dapat menampilkan Metadata secara real time, menghasilkan log audit, menampilkan Data Lineage, atau melakukan tugas tata kelola data terkait EMR.

Siapkan data sumber dan paket JAR

Siapkan data sumber

Buat file contoh bernama input01.txt dengan konten berikut.

hadoop emr hadoop dw
hive hadoop
dw emr

Unggah data sumber

  1. Login ke konsol OSS. Di panel navigasi sebelah kiri, klik Bucket List.

  2. Klik nama Bucket target untuk masuk ke halaman File Management.

    Tutorial ini menggunakan onaliyun-bucket-2 sebagai contoh Bucket.

  3. Klik Create Directory untuk membuat direktori bagi data sumber dan resource JAR.

    • Atur Directory Name menjadi emr/datas/wordcount02/inputs untuk membuat direktori bagi data sumber.

    • Atur Directory Name menjadi emr/jars untuk membuat direktori bagi resource JAR.

  4. Unggah file data sumber ke direktori tersebut.

    • Arahkan ke path /emr/datas/wordcount02/inputs dan klik Upload File.

    • Pada bagian Files to Upload, klik Scan for Files, tambahkan file input01.txt ke Bucket, lalu klik Upload File.

Hasilkan JAR MapReduce untuk OSS

  1. Buka proyek IDEA Anda dan tambahkan dependensi pom.

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!-- Versi EMR MR adalah 2.8.5. -->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. Untuk membaca dan menulis file OSS dalam MapReduce, konfigurasikan parameter berikut.

    Penting

    Peringatan: AccessKey akun Alibaba Cloud memberikan izin penuh untuk semua operasi API. Kami menyarankan menggunakan pengguna RAM untuk panggilan API dan operasi harian. Jangan menyematkan ID AccessKey dan Secret AccessKey Anda secara langsung dalam kode proyek atau lokasi publik lainnya. Kebocoran AccessKey akan membahayakan keamanan seluruh resource dalam akun Anda. Contoh kode berikut hanya untuk referensi. Simpan kredensial AccessKey Anda secara aman dan tangani dengan hati-hati.

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    Deskripsi parameter:

    • ${accessKeyId}: ID AccessKey akun Alibaba Cloud Anda.

    • ${accessKeySecret}: Secret AccessKey akun Alibaba Cloud Anda.

    • ${endpoint}: Titik akhir untuk mengakses OSS. Titik akhir ditentukan oleh Region tempat kluster Anda berada. Bucket OSS harus berada di Region yang sama dengan kluster. Untuk informasi selengkapnya, lihat Region dan titik akhir.

    Kode Java berikut menyesuaikan contoh WordCount resmi Hadoop dengan menambahkan konfigurasi ID AccessKey dan Secret AccessKey untuk mengakses file OSS.

    Kode contoh

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
                                    
  3. Setelah mengedit kode Java, buat paket JAR. Contoh ini menggunakan onaliyun_mr_wordcount-1.0-SNAPSHOT.jar sebagai nama paket JAR.

Prosedur

  1. Pada halaman konfigurasi node EMR MR, kembangkan pekerjaan tersebut.

    Kembangkan tugas EMR MR

    Pilih salah satu pendekatan berikut berdasarkan kasus penggunaan Anda:

    Unggah dan referensikan

    DataWorks memungkinkan Anda mengunggah resource lokal ke DataStudio lalu mereferensikannya dalam suatu Node. Jika resource yang dibutuhkan oleh Node EMR MR terlalu besar untuk diunggah melalui antarmuka DataWorks, Anda dapat menyimpan resource tersebut di Object Storage Service (OSS) dan mereferensikannya dalam kode Anda.

    1. Buat resource JAR EMR.

      1. Untuk informasi selengkapnya, lihat manajemen resource. Simpan paket JAR yang dihasilkan di Siapkan data sumber dan paket JAR ke direktori resource JAR emr/jars. Klik Click to upload untuk mengunggah resource JAR.

      2. Pilih Storage Path, Data Source, dan Resource Group.

      3. Klik Save.

      image

    2. Referensikan resource JAR EMR.

      1. Buka Node EMR MR yang telah dibuat dan masuk ke halaman editor kode.

      2. Di panel manajemen resource di sebelah kiri, temukan resource yang ingin direferensikan, misalnya onaliyun_mr_wordcount-1.0-SNAPSHOT.jar, klik kanan, lalu pilih Reference Resource.

      3. Setelah mereferensikan resource, pesan konfirmasi akan muncul di editor kode. Kemudian, masukkan perintah berikut. Nama paket JAR, nama Bucket, dan path dalam perintah ini hanyalah contoh. Gantilah dengan nilai aktual Anda.

        ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
        onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
        Catatan

        Pernyataan komentar tidak didukung di editor Node EMR MR.

    Referensikan dari OSS

    Anda dapat langsung mereferensikan resource OSS dari Node menggunakan OSS REF. Saat Node EMR dijalankan, DataWorks secara otomatis mengunduh resource OSS yang direferensikan ke lingkungan eksekusi lokal. Pendekatan ini berguna ketika pekerjaan EMR memiliki dependensi JAR atau bergantung pada file skrip lainnya.

    1. Unggah resource JAR.

      1. Setelah selesai mengembangkan kode, login ke konsol OSS dan klik Bucket List di panel navigasi sebelah kiri untuk Region Anda.

      2. Klik nama Bucket target untuk masuk ke halaman File Management.

        Tutorial ini menggunakan onaliyun-bucket-2 sebagai contoh Bucket.

      3. Unggah resource JAR ke direktorinya.

        Arahkan ke direktori emr/jars, klik Upload File, lalu pada bagian Files to Upload, klik Scan for Files. Tambahkan file onaliyun_mr_wordcount-1.0-SNAPSHOT.jar ke Bucket dan klik Upload File.

    2. Referensikan resource JAR.

      Pada halaman konfigurasi Node EMR MR yang telah dibuat, edit kode untuk mereferensikan resource JAR tersebut.

      Pada halaman konfigurasi Node EMR MR yang telah dibuat, edit kode untuk mereferensikan resource JAR tersebut.

      hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      Catatan

      Format perintahnya adalah hadoop jar <path_to_the_JAR_to_run> <fully_qualified_main_class_name> <input_directory> <output_directory>.

      Tabel berikut menjelaskan path ke JAR yang akan dijalankan.

      Parameter

      Deskripsi

      Path to the JAR to run

      Format: ossref://{endpoint}/{Bucket}/{object}

      • endpoint: Titik akhir untuk mengakses OSS. Jika parameter ini dikosongkan, Anda hanya dapat mengakses OSS di Region yang sama dengan kluster EMR. Bucket OSS harus berada di Region yang sama dengan kluster EMR.

      • Bucket: Kontainer OSS untuk menyimpan objek. Setiap Bucket memiliki nama unik. Anda dapat login ke konsol OSS untuk melihat semua Buckets di bawah akun Anda.

      • object: Objek tertentu, seperti nama file atau path, yang disimpan di Bucket.

    (Opsional) Konfigurasi parameter lanjutan

    Anda dapat mengonfigurasi parameter tertentu di bagian EMR Node Parameter or DataWorks Parameter di bawah di sisi kanan panel konfigurasi node.

    Catatan
    • Parameter lanjutan yang tersedia sedikit berbeda tergantung pada jenis kluster EMR, seperti yang ditunjukkan pada tabel berikut.

    • Properti Spark open source tambahan dapat dikonfigurasi di bagian EMR Node Parameter or DataWorks Parameter di bawah di sisi kanan panel konfigurasi node.

    Kluster Datalake atau kustom

    Parameter lanjutan

    Deskripsi

    queue

    Menentukan antrian penjadwalan untuk mengirimkan pekerjaan. Nilai default-nya adalah antrian default. Untuk informasi tentang EMR YARN, lihat Konfigurasi antrian dasar.

    priority

    Prioritas. Nilai default-nya adalah 1.

    FLOW_SKIP_SQL_ANALYZE

    Menentukan metode eksekusi untuk pernyataan SQL. Nilai yang valid:

    • true: Eksekusi beberapa pernyataan SQL sekaligus.

    • false (default): Eksekusi satu pernyataan SQL dalam satu waktu.

    Catatan

    Parameter ini hanya dapat digunakan untuk pengujian di lingkungan pengembangan data.

    Others

    Anda juga dapat menambahkan parameter pekerjaan MR kustom di bagian konfigurasi lanjutan. Saat Anda mengirimkan kode, DataWorks secara otomatis menambahkan parameter baru tersebut ke perintah menggunakan pernyataan -D key=value.

    Kluster Hadoop

    Parameter lanjutan

    Deskripsi

    queue

    Menentukan antrian penjadwalan untuk mengirimkan pekerjaan. Nilai default-nya adalah antrian default. Untuk informasi tentang EMR YARN, lihat Konfigurasi antrian dasar.

    priority

    Prioritas. Nilai default-nya adalah 1.

    USE_GATEWAY

    Menentukan apakah pekerjaan dikirim melalui kluster gateway. Nilai yang valid:

    • true: Kirim pekerjaan melalui kluster gateway.

    • false (default): Jangan kirim pekerjaan melalui kluster gateway. Pekerjaan dikirim ke node head secara default.

    Catatan

    Jika kluster tempat node ini berada tidak dikaitkan dengan kluster gateway dan Anda mengatur parameter ini ke true, pengiriman pekerjaan EMR akan gagal.

    Jalankan tugas

    1. Pada bagian Run Configuration Compute Resources, konfigurasikan Compute Resources dan Resource Group.

      Catatan
      • Anda juga dapat mengonfigurasi Scheduling CUs berdasarkan kebutuhan resource tugas. Nilai CU default-nya adalah 0.25.

      • Untuk mengakses sumber data melalui jaringan publik atau VPC, Anda harus menggunakan Resource Group penjadwalan yang telah lulus uji konektivitas dengan Sumber data. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.

    2. Pada kotak dialog parameter di bilah alat, pilih Sumber data yang telah dibuat lalu klik Run.

  2. Untuk menjalankan tugas Node secara berkala, konfigurasikan properti penjadwalan. Untuk informasi selengkapnya, lihat Konfigurasi penjadwalan node.

  3. Setelah mengonfigurasi tugas Node, Anda harus menerapkan Node tersebut. Untuk informasi selengkapnya, lihat Penerapan node dan workflow.

  4. Setelah tugas diterapkan, Anda dapat melihat statusnya di Operation Center. Untuk informasi selengkapnya, lihat Memulai Operation Center.

Lihat hasil

  • Login ke konsol OSS. Anda dapat melihat file output di direktori output yang ditentukan pada Bucket target. Dalam contoh ini, path-nya adalah emr/datas/wordcount02/outputs.target Bucket

  • Baca statistik di DataWorks.

    1. Buat Node EMR Hive. Untuk informasi selengkapnya, lihat Membuat node untuk workflow terjadwal.

    2. Pada Node EMR Hive, buat Tabel Hive Eksternal yang dipasang pada OSS dan baca data tabel tersebut. Kode contoh:

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT 'word',
          `cout` STRING COMMENT 'count'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      Gambar berikut menunjukkan hasilnya.hasil eksekusi