このトピックでは、EMR-Flume を使用して Log Service のデータをEMR クラスターの HDFS に移行し、レコードのタイムスタンプに基づいてデータをパーティションに格納する方法について説明します。

概要

EMR は、V3.20.0 以降、Log Service ソースを備えた EMR-Flume を特徴としています。 Logtail などの Log Service のツールを使用してデータを LogHub に移動し、EMR-Flume を使用してデータを EMR クラスターの HDFS に移動できます。 詳細は、「収集方法」をご参照ください。

事前準備

Hadoop クラスターを作成し、オプションのサービスから Flume を選択します。 詳細は、「クラスターの作成」をご参照ください。

Flume の設定

  • Source の設定
    設定項目 説明
    type org.apache.flume.source.loghub.LogHubSource
    endpoint LogHub のエンドポイント VPC またはクラシックネットワークエンドポイントを使用している場合は、VPC またはクラシックネットワークが EMR クラスターと同じリージョンに展開されていることをご確認ください。 パブリックネットワークエンドポイントを使用している場合は、Flume エージェントが実行するノードにパブリック IP アドレスが割り当てられていることをご確認ください。
    project LogHub のプロジェクト
    logstore LogHub のログストア
    accessKeyId AccessKey ID
    accessKey AccessKey Secret
    useRecordTime true デフォルト値: false。 タイムスタンププロパティがヘッダーにない場合、イベントが受信された時刻はタイムスタンプとしてエンコードされ、ヘッダーに挿入されます。 Flume Agent が起動または停止したり、データの同期が遅れたりすると、データが間違ったパーティションに配置されます。 値を true に設定します。 true 値は、LogHub がデータを収集する時間をタイムスタンプとして使用することを示します。
    consumerGroup consumer_1 コンシューマグループの名前。 デフォルト値:consumer_1。
    その他の設定項目についての説明を以下に示します。
    • consumerPosition

      コンシューマグループが LogHub データを初めて使用する位置。 デフォルト値: end (最新のデータを処理することを示します)。 有効値は、begin、special、end。begin は、コンシューマグループが最も古いデータから処理を開始することを示します。 special は、コンシューマグループが指定されたオフセットでデータ処理を開始することを示します。 値が special に設定されている場合は、startTime 設定アイテムを使用してオフセットを指定する必要があります。 単位: 秒。 LogHub サーバーは、最初のデータ処理後のコンシューマグループのコンシューマポジションを記録します。 consumerPosition 値を変更するには、LogHub データを使用するコンシューマグループのステータスを消去します。 詳細については、「コンシューマグループのステータス」をご参照ください。 また、consumerGroup の値を変更して、別のコンシューマグループを割り当てることも可能です。

    • heartbeatInterval および fetchInOrder

      heartbeatInterval は、コンシューマグループがサーバーにハートビートを送信する間隔を示します。 単位: ミリ秒。 デフォルト値は 30000 です。fetchInOrder は、コンシューマグループが同じキーのデータを順番に処理するかどうかを示します。 デフォルト値: false。

    • batchSize および batchDurationMillis

      ソースバッチの共通設定項目。 イベントをチャネルに書き込むトリガーとなるしきい値を示します。

    • backoffSleepIncrement および maxBackoffSleep

      ソーススリープの共通設定項目。 LogHub にデータが見つからない場合、LogHub データを再取得するまでの、遅延時間および最大遅延時間を示します。

  • チャネルとシンクの設定
    この例では、メモリチャネルと HDFS シンクが使用されています。
    • 次のように HDFS シンクを設定します。
      設定項目
      hdfs.path /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
      hdfs.fileType DataStream
      hdfs.rollInterval 3600
      hdfs.round true
      hdfs.roundValue 60
      hdfs.roundUnit minute
      hdfs.rollSize 0
      hdfs.rollCount 0
    • 次のようにメモリチャネルを設定します。
      設定項目
      capacity 2000
      transactionCapacity 2000

Flume Agent の実行

詳しくは、「Flume の使用方法」をご参照ください。 Flume Agent の開始後、設定された HDFS パスで、レコードのタイムスタンプに基づいてパーティションに保存されているログを確認できます。

Log Service のコンシューマグループのステータスについては、次をご参照ください。 コンシューマーグループ - ステータス表示