このトピックでは、Flume を使用して E-MapReduce (EMR) Kafka クラスターから Alibaba Cloud OSS-HDFS サービスにデータを同期する方法について説明します。
前提条件
OSS-HDFS サービスが有効化され、承認されていること。 詳細については、「OSS-HDFS サービスを有効にする」をご参照ください。
DataLake クラスターを作成し、Flume サービスを選択していること。 詳細については、「クラスターの作成」をご参照ください。
DataFlow クラスターを作成し、Kafka サービスを選択していること。 詳細については、「クラスターの作成」をご参照ください。
手順
Flume を構成します。
Flume の構成ページに移動します。
EMR コンソールにログインします。 左側のナビゲーションウィンドウで、[EMR on ECS] をクリックします。
必要に応じて、トップメニューバーでリージョンとリソースグループを選択します。
[EMR On ECS] ページで、対象クラスターの [操作] 列にある [クラスターサービス] をクリックします。
[クラスターサービス] タブで、[FLUME] サービスセクションの [設定] をクリックします。
JVM の最大利用可能メモリ (Xmx) を設定します。
Flume は、OSS-HDFS にデータを書き込む際に、大量の Java 仮想マシン (JVM) メモリを消費します。 次のステップを実行して、Flume エージェントの Xmx 値を増やすことができます。
[flume-env.sh] タブをクリックします。
このトピックでは、グローバル構成メソッドについて説明します。 ノードごとに構成する場合は、[FLUME] サービスの [設定] ページのドロップダウンリストから [独立ノード設定] を選択します。
[JAVA_OPTS] パラメーターの値を変更します。
たとえば、JVM の最大利用可能メモリを 1 GB に設定するには、パラメーター値を -Xmx1g に変更します。
[保存] をクリックします。
[flume-conf.properties] 構成を変更します。
[flume-conf.properties] タブをクリックします。
このトピックでは、グローバル構成メソッドについて説明します。 ノードごとに構成する場合は、[FLUME] サービスの [設定] ページのドロップダウンリストから [独立ノード設定] を選択します。
[flume-conf.properties] のエディターで、次の設定項目を追加します。
説明次の例の [default-agent] の値は、[FLUME] サービスの [設定] ページの [agent_name] パラメーターの値と同じである必要があります。
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path} default-agent.sinks.k1.hdfs.fileType=DataStream # Use a channel which buffers events in memory default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1パラメーター
説明
default-agent.sources.source1.kafka.bootstrap.servers
Kafka クラスター内のブローカーのホスト名とポート番号。
default-agent.sinks.k1.hdfs.path
OSS-HDFS のパス。 フォーマットは oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path} です。 値の例は oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result です。
パラメーターは次のように説明されます。
{yourBucketName}: OSS-HDFS サービスが有効になっているバケットの名前。
{yourBucketRegion}: バケットが配置されているリージョン ID。
{path}: OSS-HDFS サービスのディレクトリ名。
default-agent.channels.c1.capacity
チャンネルに保存されるイベントの最大数。 ご使用の環境に基づいて、このパラメーター値を変更します。
default-agent.channels.c1.transactionCapacity
各トランザクションチャンネルがソースから受信するか、シンクに提供するイベントの最大数。 ご使用の環境に基づいて、このパラメーター値を変更します。
[保存] をクリックします。
データ同期をテストします。
- Secure Shell (SSH) を使用して Dataflow クラスターにログインします。 詳細については、「クラスターへのログイン」をご参照ください。
- 次のコマンドを実行して、flume-test という名前の Topic を作成します。
kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create テストデータを生成します。
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092たとえば、
abcと入力して Enter キーを押します。oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result パスに FlumeData.xxxx という名前のファイルが生成されます。 ファイル名の xxxx は、ファイルが生成されたときのタイムスタンプ (ミリ秒) を表します。