すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:Flume を使用して EMR Kafka クラスターから OSS-HDFS にデータを同期する

最終更新日:Nov 09, 2025

このトピックでは、Flume を使用して E-MapReduce (EMR) Kafka クラスターから Alibaba Cloud OSS-HDFS サービスにデータを同期する方法について説明します。

前提条件

  • OSS-HDFS サービスが有効化され、承認されていること。 詳細については、「OSS-HDFS サービスを有効にする」をご参照ください。

  • DataLake クラスターを作成し、Flume サービスを選択していること。 詳細については、「クラスターの作成」をご参照ください。

  • DataFlow クラスターを作成し、Kafka サービスを選択していること。 詳細については、「クラスターの作成」をご参照ください。

手順

  1. Flume を構成します。

    1. Flume の構成ページに移動します。

      1. EMR コンソールにログインします。 左側のナビゲーションウィンドウで、[EMR on ECS] をクリックします。

      2. 必要に応じて、トップメニューバーでリージョンとリソースグループを選択します。

      3. [EMR On ECS] ページで、対象クラスターの [操作] 列にある [クラスターサービス] をクリックします。

      4. [クラスターサービス] タブで、[FLUME] サービスセクションの [設定] をクリックします。

    2. JVM の最大利用可能メモリ (Xmx) を設定します。

      Flume は、OSS-HDFS にデータを書き込む際に、大量の Java 仮想マシン (JVM) メモリを消費します。 次のステップを実行して、Flume エージェントの Xmx 値を増やすことができます。

      1. [flume-env.sh] タブをクリックします。

        このトピックでは、グローバル構成メソッドについて説明します。 ノードごとに構成する場合は、[FLUME] サービスの [設定] ページのドロップダウンリストから [独立ノード設定] を選択します。

      2. [JAVA_OPTS] パラメーターの値を変更します。

        たとえば、JVM の最大利用可能メモリを 1 GB に設定するには、パラメーター値を -Xmx1g に変更します。

      3. [保存] をクリックします。

    3. [flume-conf.properties] 構成を変更します。

      1. [flume-conf.properties] タブをクリックします。

        このトピックでは、グローバル構成メソッドについて説明します。 ノードごとに構成する場合は、[FLUME] サービスの [設定] ページのドロップダウンリストから [独立ノード設定] を選択します。

      2. [flume-conf.properties] のエディターで、次の設定項目を追加します。

        説明

        次の例の [default-agent] の値は、[FLUME] サービスの [設定] ページの [agent_name] パラメーターの値と同じである必要があります。

        default-agent.sources = source1
        default-agent.sinks = k1
        default-agent.channels = c1
        
        default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
        default-agent.sources.source1.channels = c1
        default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
        default-agent.sources.source1.kafka.topics = flume-test
        default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
        
        default-agent.sinks.k1.type = hdfs
        default-agent.sinks.k1.hdfs.path = oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}
        default-agent.sinks.k1.hdfs.fileType=DataStream
        
        # Use a channel which buffers events in memory
        default-agent.channels.c1.type = memory
        default-agent.channels.c1.capacity = 100
        default-agent.channels.c1.transactionCapacity = 100
        
        # Bind the source and sink to the channel
        default-agent.sources.source1.channels = c1
        default-agent.sinks.k1.channel = c1

        パラメーター

        説明

        default-agent.sources.source1.kafka.bootstrap.servers

        Kafka クラスター内のブローカーのホスト名とポート番号。

        default-agent.sinks.k1.hdfs.path

        OSS-HDFS のパス。 フォーマットは oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path} です。 値の例は oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result です。

        パラメーターは次のように説明されます。

        • {yourBucketName}: OSS-HDFS サービスが有効になっているバケットの名前。

        • {yourBucketRegion}: バケットが配置されているリージョン ID。

        • {path}: OSS-HDFS サービスのディレクトリ名。

        default-agent.channels.c1.capacity

        チャンネルに保存されるイベントの最大数。 ご使用の環境に基づいて、このパラメーター値を変更します。

        default-agent.channels.c1.transactionCapacity

        各トランザクションチャンネルがソースから受信するか、シンクに提供するイベントの最大数。 ご使用の環境に基づいて、このパラメーター値を変更します。

      3. [保存] をクリックします。

  2. データ同期をテストします。

    1. Secure Shell (SSH) を使用して Dataflow クラスターにログインします。 詳細については、「クラスターへのログイン」をご参照ください。
    2. 次のコマンドを実行して、flume-test という名前の Topic を作成します。
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. テストデータを生成します。

      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      たとえば、abc と入力して Enter キーを押します。

      oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result パスに FlumeData.xxxx という名前のファイルが生成されます。 ファイル名の xxxx は、ファイルが生成されたときのタイムスタンプ (ミリ秒) を表します。