このトピックでは、E-MapReduce(EMR)Dataflow クラスタから EMR データレイク クラスタの HDFS サービスにデータを同期する方法について説明します。
前提条件
手順
- Flume サービスを構成します。
- Flume サービスの [構成] タブに移動します。
- 上部のナビゲーション バーで、クラスタが存在するリージョンを選択し、ビジネス要件に基づいてリソース グループを選択します。
- [EMR On ECS] ページで、管理するクラスタを見つけ、[アクション] 列の [サービス] をクリックします。
- [サービス] タブで、構成[flume] サービス セクションの をクリックします。
- [構成] タブで、[flume-conf.properties] サブタブをクリックします。この例では、グローバル構成が使用されています。ノードごとにクラスタを構成する場合は、独立ノード構成構成[flume] サービスの サブタブのドロップダウン リストから を選択できます。
- [flume-conf.properties] パラメータの値に次のコンテンツを追加します。説明
default-agent
パラメータの値は、エージェント名構成[flume] サービスの タブにある パラメータの値と同じである必要があります。default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group # シンクについて記述します default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = hdfs://emr-cluster/tmp/flume/test-data default-agent.sinks.k1.hdfs.fileType=DataStream # メモリ内にイベントをバッファリングするチャネルを使用します default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # ソースとシンクをチャネルにバインドします default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
パラメータ 説明 default-agent.sources.source1.kafka.bootstrap.servers Dataflow クラスタ内のブローカーのサーバーとポート番号。 default-agent.sources.source1.kafka.topics Flume が Kafka データを消費するトピック。 default-agent.channels.c1.capacity チャネルに格納されるイベントの最大数。ビジネス要件に基づいて、このパラメータの値を変更します。 default-agent.channels.c1.transactionCapacity 各トランザクション チャネルがソースから受信するか、レシーバーに提供するイベントの最大数。ビジネス要件に基づいて、このパラメータの値を変更します。 default-agent.sinks.k1.hdfs.path Flume が HDFS にデータを書き込むパス。 一般的なクラスタと高可用性クラスタのサンプルコードを以下に示します。- 高可用性クラスタ
default-agent.sinks.k1.hdfs.path = hdfs://emr-cluster/tmp/flume/test-data
- 一般的なクラスタ
default-agent.sinks.k1.hdfs.path = hdfs://master-1-1:9000/tmp/flume/test-data
- 高可用性クラスタ
- 構成を保存します。
- 左下隅にある [保存] をクリックします。
- 表示されるダイアログ ボックスで、実行理由を入力し、[保存] をクリックします。
- Flume サービスの [構成] タブに移動します。
- Flume サービスを開始します。
- Flume サービスの [ステータス] タブで、FlumeAgent コンポーネントを見つけ、[アクション] 列で を選択します。
- 表示されるダイアログ ボックスで、実行理由を入力し、[OK] をクリックします。
- [確認] メッセージで、[OK] をクリックします。
- データ同期をテストします。
- セキュア シェル(SSH)を使用して Dataflow クラスタにログオンします。詳細については、「クラスタへのログオン」をご参照ください。
- flume-test という名前のトピックを作成するには、次のコマンドを実行します。
kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
- テスト データを生成します。
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092
たとえば、
abc
と入力し、Enter キーを押します。 - SSH を使用してデータレイク クラスタに接続し、生成されたファイルを表示します。Flume は、HDFS に FlumeData.xxxx という名前のファイルを生成します。xxxx は、ミリ秒単位の現在のタイムスタンプを示します。ファイルの内容は、テスト データ
abc
です。hdfs dfs -cat /tmp/flume/test-data/<FlumeData.xxxx>