全部产品
Search
文档中心

Object Storage Service:gunakan flume untuk menyinkronkan data dari kluster emr kafka ke oss-hdfs

更新时间:Nov 09, 2025

Topik ini menjelaskan cara menggunakan Flume untuk menyinkronkan data dari kluster E-MapReduce (EMR) Kafka ke layanan Alibaba Cloud OSS-HDFS.

Prasyarat

  • Layanan OSS-HDFS telah diaktifkan dan diberi otorisasi. Untuk informasi lebih lanjut, lihat Aktifkan Layanan OSS-HDFS.

  • Anda telah membuat kluster DataLake dan memilih layanan Flume. Untuk informasi lebih lanjut, lihat Buat Kluster.

  • Anda telah membuat kluster DataFlow dan memilih layanan Kafka. Untuk informasi lebih lanjut, lihat Buat Kluster.

Prosedur

  1. Konfigurasikan Flume.

    1. Pergi ke halaman konfigurasi Flume.

      1. Masuk ke Konsol EMR. Di panel navigasi sebelah kiri, klik EMR on ECS.

      2. Di bilah menu atas, pilih Wilayah dan kelompok sumber daya sesuai kebutuhan.

      3. Pada halaman EMR on ECS, klik Cluster Service di kolom Tindakan untuk kluster target.

      4. Pada tab Cluster Service, klik Configure di bagian layanan FLUME.

    2. Atur memori maksimum yang tersedia (Xmx) untuk JVM.

      Flume mengonsumsi banyak memori Java Virtual Machine (JVM) saat menulis data ke OSS-HDFS. Anda dapat meningkatkan nilai Xmx untuk agen Flume dengan melakukan langkah-langkah berikut:

      1. Klik tab flume-env.sh.

        Topik ini menjelaskan metode konfigurasi global. Jika Anda ingin mengonfigurasi per node, pilih Independent Node Configuration dari daftar drop-down pada halaman Configuration layanan FLUME.

      2. Ubah nilai parameter JAVA_OPTS.

        Sebagai contoh, untuk mengatur memori maksimum JVM menjadi 1 GB, ubah nilai parameter menjadi -Xmx1g.

      3. Klik Simpan.

    3. Modifikasi konfigurasi flume-conf.properties.

      1. Klik tab flume-conf.properties.

        Topik ini menjelaskan metode konfigurasi global. Jika Anda ingin mengonfigurasi per node, pilih Independent Node Configuration dari daftar drop-down pada halaman Configuration layanan FLUME.

      2. Di editor untuk flume-conf.properties, tambahkan item konfigurasi berikut.

        Catatan

        Nilai default-agent dalam contoh berikut harus sama dengan nilai parameter agent_name pada halaman Configuration layanan FLUME.

        default-agent.sources = source1
        default-agent.sinks = k1
        default-agent.channels = c1
        
        default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
        default-agent.sources.source1.channels = c1
        default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
        default-agent.sources.source1.kafka.topics = flume-test
        default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
        
        default-agent.sinks.k1.type = hdfs
        default-agent.sinks.k1.hdfs.path = oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}
        default-agent.sinks.k1.hdfs.fileType=DataStream
        
        # Gunakan saluran yang menyangga event di memori
        default-agent.channels.c1.type = memory
        default-agent.channels.c1.capacity = 100
        default-agent.channels.c1.transactionCapacity = 100
        
        # Hubungkan sumber dan sink ke saluran
        default-agent.sources.source1.channels = c1
        default-agent.sinks.k1.channel = c1

        Parameter

        Deskripsi

        default-agent.sources.source1.kafka.bootstrap.servers

        Nama host dan nomor port broker di kluster Kafka.

        default-agent.sinks.k1.hdfs.path

        Path OSS-HDFS. Formatnya adalah oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}. Contoh nilainya adalah oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result.

        Parameter dijelaskan sebagai berikut:

        • {yourBucketName}: Nama bucket tempat layanan OSS-HDFS diaktifkan.

        • {yourBucketRegion}: ID wilayah tempat bucket berada.

        • {path}: Nama direktori untuk layanan OSS-HDFS.

        default-agent.channels.c1.capacity

        Jumlah maksimum event yang disimpan di saluran. Ubah nilai parameter ini berdasarkan lingkungan Anda.

        default-agent.channels.c1.transactionCapacity

        Jumlah maksimum event yang diterima setiap saluran transaksi dari sumber atau diberikan ke sink. Ubah nilai parameter ini berdasarkan lingkungan Anda.

      3. Klik Simpan.

  2. Uji sinkronisasi data.

    1. Gunakan Secure Shell (SSH) untuk masuk ke kluster Dataflow. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.
    2. Jalankan perintah berikut untuk membuat topik bernama flume-test:
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. Hasilkan data uji.

      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      Sebagai contoh, masukkan abc dan tekan Enter.

      File bernama FlumeData.xxxx dibuat di path oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result. Dalam nama file, xxxx mewakili timestamp dalam milidetik ketika file tersebut dibuat.