全部产品
Search
文档中心

DataWorks:Buat node EMR MR

更新时间:Jul 09, 2025

Anda dapat membuat node EMR (E-MapReduce) MR untuk memproses dataset besar menggunakan beberapa tugas map paralel. Node ini membantu mempercepat komputasi paralel pada dataset besar. Topik ini menjelaskan cara membuat node EMR MR dan mengembangkan tugas di dalamnya. Dalam contoh ini, tugas digunakan untuk membaca data dari objek Object Storage Service (OSS) dalam Bucket OSS dan menghitung jumlah kata dalam objek tersebut.

Prasyarat

  • Kluster EMR terdaftar di DataWorks. Untuk informasi lebih lanjut, lihat DataStudio (versi lama): Asosiasikan Sumber Daya Komputasi EMR.

  • (Diperlukan jika Anda menggunakan pengguna RAM untuk mengembangkan tugas) Pengguna RAM ditambahkan ke ruang kerja DataWorks sebagai anggota dan diberi peran Develop atau Workspace Administrator. Peran Workspace Administrator memiliki lebih banyak izin daripada yang diperlukan. Berhati-hatilah saat memberikan peran ini. Untuk informasi lebih lanjut tentang cara menambahkan anggota, lihat Tambahkan Anggota Ruang Kerja dan Tetapkan Peran kepada Mereka.

  • Grup sumber daya serverless dibeli dan dikonfigurasi. Konfigurasinya mencakup asosiasi dengan ruang kerja dan konfigurasi jaringan. Untuk informasi lebih lanjut, lihat Buat dan Gunakan Grup Sumber Daya Serverless.

  • Alur kerja dibuat di DataStudio. Untuk informasi lebih lanjut, lihat Buat Alur Kerja.

  • Jika Anda ingin merujuk kode sumber terbuka di node EMR MR, pastikan bahwa kode tersebut diunggah sebagai sumber daya EMR JAR. Untuk informasi lebih lanjut, lihat Buat dan Gunakan Sumber Daya EMR.

  • Jika Anda ingin merujuk fungsi yang ditentukan pengguna (UDF) di node EMR MR, pastikan bahwa UDF diunggah sebagai sumber daya EMR JAR dan didaftarkan dengan EMR. Untuk informasi lebih lanjut tentang cara mendaftarkan UDF, lihat Buat Fungsi EMR.

  • Bucket OSS dibuat. Untuk menggunakan kode sampel untuk pengembangan tugas dalam topik ini, Anda harus menyiapkan bucket OSS. Untuk informasi lebih lanjut tentang cara membuat bucket OSS, lihat Buat Bucket.

Batasan

  • Jenis node ini hanya dapat dijalankan pada grup sumber daya serverless atau grup sumber daya eksklusif untuk penjadwalan. Kami merekomendasikan Anda menggunakan grup sumber daya serverless.

  • Jika Anda ingin mengelola metadata untuk DataLake atau kluster kustom di DataWorks, Anda harus mengonfigurasi EMR-HOOK di kluster Anda terlebih dahulu. Jika tidak, metadata tidak dapat ditampilkan secara real-time, log audit tidak dapat dihasilkan, dan garis keturunan data tidak dapat ditampilkan di DataWorks. Tugas tata kelola EMR juga tidak dapat dijalankan. Untuk informasi tentang cara mengonfigurasi EMR-HOOK, lihat Gunakan Fitur Ekstensi Hive untuk Mencatat Garis Keturunan Data dan Informasi Akses Historis.

Siapkan data awal dan paket sumber daya JAR

Siapkan data awal

Buat file bernama input01.txt yang berisi data awal berikut:

hadoop emr hadoop dw
hive hadoop
dw emr

Unggah file yang menyimpan data awal

  1. Masuk ke Konsol OSS. Di panel navigasi kiri, klik Buckets.

  2. Di halaman Bucket, temukan bucket yang diinginkan dan klik nama bucket untuk masuk ke halaman Objects.

    Dalam contoh ini, bucket onaliyun-bucket-2 digunakan.

  3. Di halaman Objek, klik Create Directory untuk membuat direktori yang digunakan untuk menyimpan data awal dan sumber daya JAR.

    • Atur Directory Name menjadi emr/datas/wordcount02/inputs untuk membuat direktori yang digunakan untuk menyimpan data awal.

    • Atur Directory Name menjadi emr/jars untuk membuat direktori yang digunakan untuk menyimpan sumber daya JAR.

  4. Unggah file yang menyimpan data awal ke direktori emr/datas/wordcount02/inputs.

    • Pergi ke direktori /emr/datas/wordcount02/inputs dan klik Upload Object.

    • Di bagian Files to Upload, klik Select Files dan unggah file input01.txt ke bucket.

Gunakan node EMR MR untuk membaca objek OSS dan menghasilkan paket JAR

  1. Buka proyek IntelliJ IDEA yang ada dan tambahkan dependensi Project Object Model (POM).

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!--Versi yang digunakan oleh EMR MR adalah 2.8.5.-->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. Konfigurasikan parameter berikut untuk membaca data dari dan menulis data ke objek OSS.

    Penting

    Akun Alibaba Cloud memiliki izin untuk memanggil semua operasi API. Jika pasangan AccessKey akun Alibaba Cloud Anda bocor, semua sumber daya di akun Anda mungkin terpapar risiko keamanan tinggi. Kami merekomendasikan agar Anda tidak menyimpan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda ke dalam kode proyek atau posisi yang mudah ditemukan. Sebagai gantinya, gunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin O&M. Contoh kode berikut disediakan hanya untuk referensi. Jaga kerahasiaan pasangan AccessKey akun Alibaba Cloud Anda.

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    Deskripsi Parameter:

    • ${accessKeyId}: ID AccessKey akun Alibaba Cloud Anda.

    • ${accessKeySecret}: Rahasia AccessKey akun Alibaba Cloud Anda.

    • ${endpoint}: Titik Akhir OSS. Titik akhir ditentukan oleh wilayah tempat kluster EMR Anda berada. Anda harus mengaktifkan OSS di wilayah tersebut. Untuk informasi lebih lanjut, lihat Wilayah OSO dan Titik Akhir.

    Dalam topik ini, kode Java digunakan untuk memodifikasi contoh WordCount di situs resmi Hadoop. Konfigurasi ID AccessKey dan Rahasia AccessKey ditambahkan ke kode untuk memberikan izin pekerjaan mengakses objek OSS.

    Contoh Kode

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ?  0 : 1);
        }
    }          
  3. Kemas kode ke dalam file JAR.

    Setelah Anda menulis dan menyimpan kode sebelumnya, kompres kode ke dalam paket JAR. Dalam contoh ini, paket bernama onaliyun_mr_wordcount-1.0-SNAPSHOT.jar dihasilkan.

Langkah 1: Buat node EMR MR

  1. Pergi ke halaman DataStudio.

    Masuk ke Konsol DataWorks. Di bilah navigasi atas, pilih wilayah yang diinginkan. Di panel navigasi kiri, pilih Data Development and O&M > Data Development. Pada halaman yang muncul, pilih ruang kerja yang diinginkan dari daftar drop-down dan klik Go to Data Development.

  2. Buat node EMR MR.

    1. Temukan alur kerja yang diinginkan, klik kanan nama alur kerja, dan pilih Create Node > EMR > EMR MR.

      Catatan

      Sebagai alternatif, Anda dapat mengarahkan pointer ke ikon Create dan pilih Create Node > EMR > EMR MR.

    2. Di kotak dialog Create Node, konfigurasikan parameter Name, Engine Instance, Node Type, dan Path. Klik Confirm. Tab konfigurasi node EMR MR muncul.

      Catatan

      Nama node hanya dapat berisi huruf, angka, garis bawah (_), dan titik (.).

Langkah 2: Kembangkan tugas EMR MR

Anda dapat menggunakan salah satu metode berikut berdasarkan kebutuhan bisnis Anda untuk mengembangkan tugas MR pada tab konfigurasi node EMR MR:

  • Unggah sumber daya dari mesin lokal Anda ke DataStudio dan kemudian merujuk sumber daya tersebut. Untuk informasi lebih lanjut, lihat bagian Metode 1: Unggah dan Rujuk Sumber Daya EMR JAR dalam topik ini. Kami merekomendasikan Anda menggunakan metode ini.

  • Gunakan metode OSS REF untuk merujuk sumber daya OSS. Untuk informasi lebih lanjut, lihat bagian Metode 2: Rujuk Sumber Daya OSS dalam topik ini.

Metode 1: Unggah dan rujuk sumber daya EMR JAR

DataWorks memungkinkan Anda mengunggah sumber daya dari mesin lokal Anda ke DataStudio sebelum Anda merujuk sumber daya tersebut. Jika node EMR MR bergantung pada sejumlah besar sumber daya, sumber daya tersebut tidak dapat diunggah menggunakan konsol DataWorks. Dalam kasus ini, Anda dapat menyimpan sumber daya di Hadoop Distributed File System (HDFS) dan merujuk sumber daya dalam kode node EMR MR.

  1. Buat sumber daya EMR JAR.

    Untuk informasi lebih lanjut tentang cara membuat sumber daya EMR JAR, lihat Buat dan Gunakan Sumber Daya EMR. Dalam contoh ini, paket JAR yang dihasilkan di bagian Siapkan Data Awal dan Paket Sumber Daya JAR disimpan di direktori emr/jars. Direktori ini digunakan untuk menyimpan sumber daya JAR. Pertama kali Anda menggunakan sumber daya EMR JAR, klik Authorize untuk memberi otorisasi DataWorks mengakses sumber daya EMR JAR. Kemudian, klik Upload untuk mengunggah sumber daya JAR.新建JAR资源

  2. Rujuk Paket JAR.

    1. Buka node EMR MR. Tab konfigurasi node muncul.

    2. Temukan sumber daya yang ingin dirujuk di bawah Resource di folder EMR, klik kanan nama sumber daya, dan pilih Insert Resource Path. Dalam contoh ini, sumber daya adalah onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.引用资源

    3. Jika pesan yang ditunjukkan pada gambar berikut muncul di tab konfigurasi node EMR MR, sumber daya kode dirujuk. Kemudian, jalankan kode berikut. Anda harus mengganti informasi dalam kode berikut dengan informasi aktual. Informasi tersebut mencakup nama paket sumber daya, nama bucket, dan direktori.

      ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
      onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      Catatan

      Anda tidak dapat menambahkan komentar saat menulis kode untuk node EMR MR.

Metode 2: Rujuk sumber daya OSS

Node saat ini dapat merujuk sumber daya OSS dengan menggunakan metode OSS REF. Saat Anda menjalankan tugas pada node, DataWorks secara otomatis memuat sumber daya OSS yang ditentukan dalam kode node. Metode ini umum digunakan dalam skenario di mana dependensi JAR diperlukan dalam tugas EMR atau tugas EMR perlu bergantung pada skrip.

  1. Unggah File JAR.

    1. Masuk ke Konsol OSS. Di bilah navigasi atas, pilih wilayah yang diinginkan. Kemudian, di panel navigasi kiri, klik Buckets.

    2. Di halaman Bucket, temukan bucket yang diinginkan dan klik nama bucket untuk masuk ke halaman Objects.

      Dalam contoh ini, bucket onaliyun-bucket-2 digunakan.

    3. Unggah file JAR ke direktori yang telah dibuat.

      Pergi ke direktori emr/jars. Klik Upload Object. Di bagian Files to Upload, klik Select Files dan tambahkan file onaliyun_mr_wordcount-1.0-SNAPSHOT.jar. Kemudian, klik Upload Object.

  2. Rujuk File JAR.

    Tulis kode yang digunakan untuk merujuk file JAR.

    Di tab konfigurasi node EMR MR, tulis kode yang digunakan untuk merujuk file JAR.

    hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
    Catatan

    Perintah sebelumnya dalam format berikut: hadoop jar <Path tempat file JAR yang akan dirujuk dan dieksekusi disimpan> <Nama lengkap kelas utama yang akan dieksekusi> <Path tempat file yang akan dibaca disimpan> <Path tempat hasil disimpan>.

    Deskripsi Parameter:

    Parameter

    Deskripsi

    Path tempat file JAR yang akan dirujuk dan dieksekusi disimpan

    Path dalam format ossref://{endpoint}/{bucket}/{object}.

    • endpoint: titik akhir OSS. Jika parameter endpoint dibiarkan kosong, hanya sumber daya dalam bucket OSS yang berada di wilayah yang sama dengan kluster EMR saat ini yang dapat dirujuk.

    • bucket: wadah yang digunakan untuk menyimpan objek di OSS. Setiap bucket memiliki nama unik. Anda dapat masuk ke Konsol OSS untuk melihat semua buckets dalam akun login saat ini.

    • object: nama file atau path yang disimpan dalam bucket.

(Opsional) Konfigurasikan parameter lanjutan

Anda dapat mengonfigurasi parameter lanjutan di tab Pengaturan Lanjutan dari tab konfigurasi node saat ini. Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter, lihat Konfigurasi Spark. Tabel berikut menjelaskan parameter lanjutan yang dapat dikonfigurasi untuk berbagai jenis kluster EMR.

Kluster DataLake atau kluster kustom: dibuat di halaman EMR pada ECS

Parameter lanjutan

Deskripsi

queue

Antrian penjadwalan tempat tugas dikomit. Nilai default: default. Untuk informasi tentang EMR YARN, lihat Penjadwal YARN.

priority

Prioritas. Nilai default: 1.

Lainnya

Anda dapat menambahkan parameter khusus untuk node EMR MR sebagai parameter lanjutan di tab Pengaturan Lanjutan di Konsol DataWorks. Saat Anda mengirim kode untuk node EMR MR di DataWorks, DataWorks menambahkan parameter khusus ke perintah dalam format -D key=value.

Kluster Hadoop: dibuat di halaman EMR pada ECS

Parameter lanjutan

Deskripsi

queue

Antrian penjadwalan tempat tugas dikomit. Nilai default: default. Untuk informasi tentang EMR YARN, lihat Penjadwal YARN.

priority

Prioritas. Nilai default: 1.

USE_GATEWAY

Menentukan apakah akan menggunakan kluster gateway untuk mengirim tugas pada node saat ini. Nilai valid:

  • true: Gunakan kluster gateway untuk mengirim tugas.

  • false (default): Jangan gunakan kluster gateway untuk mengirim tugas. Tugas secara otomatis dikirim ke node master.

Catatan

Jika kluster EMR yang dimiliki oleh node tidak terhubung dengan kluster gateway tetapi parameter USE_GATEWAY diatur ke true, tugas mungkin gagal dikirim.

Jalankan tugas MR

  1. Di bilah alat, klik ikon 高级运行. Di kotak dialog Parameters, pilih grup sumber daya yang diinginkan dari daftar drop-down Nama Grup Sumber Daya dan klik Run.

    Catatan
    • Jika Anda ingin mengakses sumber daya komputasi melalui Internet atau virtual private cloud (VPC), Anda harus menggunakan grup sumber daya untuk penjadwalan yang terhubung ke sumber daya komputasi. Untuk informasi lebih lanjut, lihat Solusi Konektivitas Jaringan.

    • Jika Anda ingin mengubah grup sumber daya dalam operasi selanjutnya, Anda dapat mengklik ikon 高级运行 (Run with Parameters) untuk mengubah grup sumber daya di kotak dialog Parameter.

  2. Klik ikon 保存 di bilah alat atas untuk menyimpan pernyataan SQL.

  3. Opsional. Lakukan pengujian asap.

    Anda dapat melakukan pengujian asap pada node dalam lingkungan pengembangan saat Anda mengirim node atau setelah Anda mengirim node. Untuk informasi lebih lanjut, lihat Lakukan Pengujian Asap.

Langkah 3: Konfigurasikan properti penjadwalan

Jika Anda ingin sistem secara berkala menjalankan tugas pada node, Anda dapat mengklik Properties di panel navigasi kanan pada tab konfigurasi node untuk mengonfigurasi properti penjadwalan tugas berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut, lihat Ikhtisar.

Catatan

Anda harus mengonfigurasi parameter Rerun dan Parent Nodes di tab Properti sebelum Anda mengirim tugas.

Langkah 4: Terapkan tugas

Setelah tugas pada node dikonfigurasi, Anda harus mengirim dan menerapkan tugas tersebut. Setelah Anda mengirim dan menerapkan tugas, sistem menjalankan tugas secara berkala berdasarkan konfigurasi penjadwalan.

  1. Klik ikon 保存 di bilah alat atas untuk menyimpan tugas.

  2. Klik ikon 提交 di bilah alat atas untuk mengirim tugas.

    Di kotak dialog Submit, konfigurasikan parameter Change description. Kemudian, tentukan apakah akan meninjau kode tugas setelah Anda mengirim tugas berdasarkan kebutuhan bisnis Anda.

    Catatan
    • Anda harus mengonfigurasi parameter Rerun dan Parent Nodes di tab Properti sebelum Anda mengirim tugas.

    • Anda dapat menggunakan fitur tinjauan kode untuk memastikan kualitas kode tugas dan mencegah kesalahan eksekusi tugas yang disebabkan oleh kode tugas yang tidak valid. Jika Anda mengaktifkan fitur tinjauan kode, kode tugas yang dikirim hanya dapat diterapkan setelah kode tugas lulus tinjauan kode. Untuk informasi lebih lanjut, lihat Tinjauan Kode.

Jika Anda menggunakan ruang kerja dalam mode standar, Anda harus menerapkan tugas di lingkungan produksi setelah Anda mengirim tugas. Untuk menerapkan tugas pada node, klik Deploy di sudut kanan atas tab konfigurasi node. Untuk informasi lebih lanjut, lihat Terapkan Node.

Apa yang harus dilakukan selanjutnya

Setelah Anda mengirim dan menerapkan tugas, tugas dijalankan secara berkala berdasarkan konfigurasi penjadwalan. Anda dapat mengklik Operation Center di sudut kanan atas tab konfigurasi node yang sesuai untuk masuk ke Pusat Operasi dan melihat status penjadwalan tugas. Untuk informasi lebih lanjut, lihat Lihat dan Kelola Tugas yang Dipicu Otomatis.

Lihat hasilnya

  • Masuk ke Konsol OSS. Kemudian, Anda dapat melihat hasilnya di direktori emr/datas/wordcount02/outputs tempat data awal disimpan.目标Bucket

  • Lihat hasil statistik di Konsol DataWorks.

    1. Buat node EMR Hive. Untuk informasi lebih lanjut, lihat Buat Node EMR Hive.

    2. Di node EMR Hive, buat tabel eksternal Hive yang dipasang ke OSS. Kemudian, gunakan tabel eksternal Hive untuk membaca data dari tabel Hive di OSS. Contoh kode:

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT 'Kata',
          `count` STRING COMMENT 'Hitung'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      Gambar berikut menunjukkan hasilnya.运行结果