このトピックでは、EMR-Flume を使用して Log Service のデータをEMR クラスターの HDFS に移行し、レコードのタイムスタンプに基づいてデータをパーティションに格納する方法について説明します。
概要
EMR は、V3.20.0 以降、Log Service ソースを備えた EMR-Flume を特徴としています。 Logtail などの Log Service のツールを使用してデータを LogHub に移動し、EMR-Flume を使用してデータを EMR クラスターの HDFS に移動できます。 詳細は、「収集方法」をご参照ください。
事前準備
Hadoop クラスターを作成し、オプションのサービスから Flume を選択します。 詳細は、「クラスターの作成」をご参照ください。
Flume の設定
- Source の設定
設定項目 値 説明 type org.apache.flume.source.loghub.LogHubSource endpoint LogHub のエンドポイント VPC またはクラシックネットワークエンドポイントを使用している場合は、VPC またはクラシックネットワークが EMR クラスターと同じリージョンに展開されていることをご確認ください。 パブリックネットワークエンドポイントを使用している場合は、Flume エージェントが実行するノードにパブリック IP アドレスが割り当てられていることをご確認ください。 project LogHub のプロジェクト logstore LogHub のログストア accessKeyId AccessKey ID accessKey AccessKey Secret useRecordTime true デフォルト値: false。 タイムスタンププロパティがヘッダーにない場合、イベントが受信された時刻はタイムスタンプとしてエンコードされ、ヘッダーに挿入されます。 Flume Agent が起動または停止したり、データの同期が遅れたりすると、データが間違ったパーティションに配置されます。 値を true に設定します。 true 値は、LogHub がデータを収集する時間をタイムスタンプとして使用することを示します。 consumerGroup consumer_1 コンシューマグループの名前。 デフォルト値:consumer_1。 その他の設定項目についての説明を以下に示します。- consumerPosition
コンシューマグループが LogHub データを初めて使用する位置。 デフォルト値: end (最新のデータを処理することを示します)。 有効値は、begin、special、end。begin は、コンシューマグループが最も古いデータから処理を開始することを示します。 special は、コンシューマグループが指定されたオフセットでデータ処理を開始することを示します。 値が special に設定されている場合は、startTime 設定アイテムを使用してオフセットを指定する必要があります。 単位: 秒。 LogHub サーバーは、最初のデータ処理後のコンシューマグループのコンシューマポジションを記録します。 consumerPosition 値を変更するには、LogHub データを使用するコンシューマグループのステータスを消去します。 詳細については、「コンシューマグループのステータス」をご参照ください。 また、consumerGroup の値を変更して、別のコンシューマグループを割り当てることも可能です。
- heartbeatInterval および fetchInOrder
heartbeatInterval は、コンシューマグループがサーバーにハートビートを送信する間隔を示します。 単位: ミリ秒。 デフォルト値は 30000 です。fetchInOrder は、コンシューマグループが同じキーのデータを順番に処理するかどうかを示します。 デフォルト値: false。
- batchSize および batchDurationMillis
ソースバッチの共通設定項目。 イベントをチャネルに書き込むトリガーとなるしきい値を示します。
- backoffSleepIncrement および maxBackoffSleep
ソーススリープの共通設定項目。 LogHub にデータが見つからない場合、LogHub データを再取得するまでの、遅延時間および最大遅延時間を示します。
- consumerPosition
- チャネルとシンクの設定
この例では、メモリチャネルと HDFS シンクが使用されています。
- 次のように HDFS シンクを設定します。
設定項目 値 hdfs.path /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H hdfs.fileType DataStream hdfs.rollInterval 3600 hdfs.round true hdfs.roundValue 60 hdfs.roundUnit minute hdfs.rollSize 0 hdfs.rollCount 0 - 次のようにメモリチャネルを設定します。
設定項目 値 capacity 2000 transactionCapacity 2000
- 次のように HDFS シンクを設定します。
Flume Agent の実行
Log Service のコンシューマグループのステータスについては、次をご参照ください。 コンシューマーグループ - ステータス表示