すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:EMR Kafka クラスタから HDFS へのデータの同期

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)Dataflow クラスタから EMR データレイク クラスタの HDFS サービスにデータを同期する方法について説明します。

前提条件

  • EMR データレイク クラスタが作成され、クラスタ作成時にオプション サービスから Flume が選択されています。詳細については、「クラスタの作成」をご参照ください。
  • EMR Dataflow クラスタが作成され、クラスタ作成時にオプション サービスから Kafka サービスが選択されています。詳細については、「クラスタの作成」をご参照ください。

手順

  1. Flume サービスを構成します。
    1. Flume サービスの [構成] タブに移動します。
      1. 上部のナビゲーション バーで、クラスタが存在するリージョンを選択し、ビジネス要件に基づいてリソース グループを選択します
      2. [EMR On ECS] ページで、管理するクラスタを見つけ、[アクション] 列の [サービス] をクリックします。
      3. [サービス] タブで、構成[flume] サービス セクションの をクリックします。
    2. [構成] タブで、[flume-conf.properties] サブタブをクリックします。
      この例では、グローバル構成が使用されています。ノードごとにクラスタを構成する場合は、独立ノード構成構成[flume] サービスの サブタブのドロップダウン リストから を選択できます。
    3. [flume-conf.properties] パラメータの値に次のコンテンツを追加します。
      説明 default-agent パラメータの値は、エージェント名構成[flume] サービスの タブにある パラメータの値と同じである必要があります。
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      # シンクについて記述します
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = hdfs://emr-cluster/tmp/flume/test-data
      default-agent.sinks.k1.hdfs.fileType=DataStream
      
      # メモリ内にイベントをバッファリングするチャネルを使用します
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 100
      default-agent.channels.c1.transactionCapacity = 100
      
      # ソースとシンクをチャネルにバインドします
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      パラメータ説明
      default-agent.sources.source1.kafka.bootstrap.serversDataflow クラスタ内のブローカーのサーバーとポート番号。
      default-agent.sources.source1.kafka.topicsFlume が Kafka データを消費するトピック。
      default-agent.channels.c1.capacityチャネルに格納されるイベントの最大数。ビジネス要件に基づいて、このパラメータの値を変更します。
      default-agent.channels.c1.transactionCapacity各トランザクション チャネルがソースから受信するか、レシーバーに提供するイベントの最大数。ビジネス要件に基づいて、このパラメータの値を変更します。
      default-agent.sinks.k1.hdfs.pathFlume が HDFS にデータを書き込むパス。
      一般的なクラスタと高可用性クラスタのサンプルコードを以下に示します。
      • 高可用性クラスタ
        default-agent.sinks.k1.hdfs.path = hdfs://emr-cluster/tmp/flume/test-data
      • 一般的なクラスタ
        default-agent.sinks.k1.hdfs.path = hdfs://master-1-1:9000/tmp/flume/test-data
    4. 構成を保存します。
      1. 左下隅にある [保存] をクリックします。
      2. 表示されるダイアログ ボックスで、実行理由を入力し、[保存] をクリックします。
  2. Flume サービスを開始します。
    1. Flume サービスの [ステータス] タブで、FlumeAgent コンポーネントを見つけ、[アクション] 列で [詳細] > [再起動] を選択します。
    2. 表示されるダイアログ ボックスで、実行理由を入力し、[OK] をクリックします。
    3. [確認] メッセージで、[OK] をクリックします。
  3. データ同期をテストします。
    1. セキュア シェル(SSH)を使用して Dataflow クラスタにログオンします。詳細については、「クラスタへのログオン」をご参照ください。
    2. flume-test という名前のトピックを作成するには、次のコマンドを実行します。
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. テスト データを生成します。
      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      たとえば、abc と入力し、Enter キーを押します。

    4. SSH を使用してデータレイク クラスタに接続し、生成されたファイルを表示します。
      Flume は、HDFS に FlumeData.xxxx という名前のファイルを生成します。xxxx は、ミリ秒単位の現在のタイムスタンプを示します。ファイルの内容は、テスト データ abc です。
      hdfs dfs -cat /tmp/flume/test-data/<FlumeData.xxxx>