すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:LogHub から HDFS へのデータ同期

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)の Flume サービスを使用して、ログサービスの LogHub から EMR クラスタの Hadoop 分散ファイルシステム(HDFS)にデータをリアルタイムで同期する方法について説明します。同期されたデータは、タイムスタンプに基づいて HDFS パーティションに自動的に保存されます。

背景情報

ログサービスの Logtail を使用して、同期するデータを収集し、リアルタイムで LogHub にアップロードできます。次に、EMR クラスタの Flume サービスを使用して、LogHub から EMR クラスタの HDFS にデータを同期します。

LogHub へのデータのアップロード方法の詳細については、「データ収集の概要」をご参照ください。

前提条件

EMR データレイククラスタが作成され、クラスタ作成時にオプションサービスから Flume が選択されています。クラスタの作成方法の詳細については、「クラスタの作成」をご参照ください。

手順

  1. Flume サービスを構成します。
    1. Flume サービスの [構成] タブに移動します。
      1. 上部のナビゲーションバーで、クラスタが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します。
      2. [ECS 上の EMR] ページで、管理するクラスタを見つけ、[アクション] 列の [サービス] をクリックします。
      3. [サービス] タブで、構成[flume] サービスセクションの をクリックします。
    2. [構成] タブで、[flume-conf.properties] サブタブをクリックします。
      この例では、グローバル構成が使用されています。ノードごとにクラスタを構成する場合は、独立ノード構成構成[flume] サービスの サブタブのドロップダウンリストから を選択できます。
    3. [flume-conf.properties] 構成項目の値に次の内容を追加します。
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.loghub.LogHubSource
      default-agent.sources.source1.endpoint = <yourLogHubEndpoint>
      default-agent.sources.source1.project = canaltest
      default-agent.sources.source1.logstore = canal
      default-agent.sources.source1.accessKeyId = yHiu*******BG2s
      default-agent.sources.source1.accessKey = ABctuw0M***************iKKljZy
      default-agent.sources.source1.useRecordTime = true
      default-agent.sources.source1.consumerGroup = consumer_1
      
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
      default-agent.sinks.k1.hdfs.fileType = DataStream
      default-agent.sinks.k1.hdfs.rollInterval = 3600
      default-agent.sinks.k1.hdfs.round = true
      default-agent.sinks.k1.hdfs.roundValue = 60
      default-agent.sinks.k1.hdfs.roundUnit = minute
      default-agent.sinks.k1.hdfs.rollSize = 0
      default-agent.sinks.k1.hdfs.rollCount = 0
      
      # メモリ内にイベントをバッファリングするチャネルを使用します
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 2000
      default-agent.channels.c1.transactionCapacity = 2000
      
      # ソースとシンクをチャネルにバインドします
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      パラメータ説明
      default-agent.sources.source1.type値を org.apache.flume.source.loghub.LogHubSource に設定します。
      default-agent.sources.source1.endpointLogHub にアクセスするために使用されるエンドポイント。
      説明 リージョンのログサービスの仮想プライベートクラウド(VPC)またはクラシックネットワークエンドポイントを使用する場合は、EMR クラスタが同じリージョンに存在することを確認してください。パブリックエンドポイントを使用する場合は、Flume エージェントが実行されているノードにパブリック IP アドレスが割り当てられていることを確認してください。
      default-agent.sources.source1.projectログサービスプロジェクトの名前。
      default-agent.sources.source1.logstoreログストアの名前。
      default-agent.sources.source1.accessKeyIdAlibaba Cloud アカウントの AccessKey ID。
      default-agent.sources.source1.accessKeyAlibaba Cloud アカウントの AccessKey シークレット。
      default-agent.sources.source1.useRecordTimeこのパラメータを true に設定します。

      デフォルト値:false。ヘッダーに timestamp プロパティが含まれていない場合、イベントが受信された時刻がタイムスタンプとしてエンコードされ、タイムスタンプがヘッダーに挿入されます。 Flume エージェントが起動または停止した場合、またはデータ同期が遅延した場合、データは間違ったパーティションに配置されます。この問題を防ぐには、このパラメータを true に設定します。このようにして、LogHub がデータを収集した時刻がタイムスタンプとして使用されます。

      default-agent.sources.source1.consumerGroupコンシューマーグループの名前。デフォルト値:consumer_1。
      default-agent.sources.source1.consumerPositionコンシューマーグループが初めて LogHub データを消費する位置。デフォルト値:end。end の値は、最新のデータから消費が開始されることを指定します。
      • begin:消費は最も古いデータから開始されます。
      • special:特定の時点から消費が開始されます。

        このパラメータを special に設定する場合は、startTime パラメータを特定の時点に設定する必要があります。単位:秒。

      LogHub サーバーは、最初のデータ消費後にコンシューマーグループの消費位置を記録します。最初のデータ消費後に consumerPosition パラメータの値を変更するには、コンシューマーグループのステータス情報をクリアするか、consumerGroup パラメータの値を変更して新しいコンシューマーグループを構成します。
      default-agent.sources.source1.heartbeatIntervalコンシューマーグループがサーバーにハートビートを送信する間隔。単位:ミリ秒。デフォルト値:30000。
      default-agent.sources.source1.fetchInOrderコンシューマーグループが同じキーを持つデータを順番に消費するかどうかを指定します。デフォルト値:false。
      default-agent.sources.source1.batchSize一度にチャネルに書き込むことができるメッセージの最大数。これは一般的なソースバッチ構成です。
      default-agent.sources.source1.batchDurationMillisメッセージが一度にチャネルに書き込まれるまで待機する最大ミリ秒数。これは一般的なソースバッチ構成です。
      default-agent.sources.source1.backoffSleepIncrementLogHub にデータがない場合にスリープをトリガーする初期および増分待機時間。これは一般的なソーススリープ構成です。
      default-agent.sources.source1.maxBackoffSleepLogHub にデータがない場合にスリープをトリガーする最大待機時間。これは一般的なソーススリープ構成です。
      default-agent.sinks.k1.hdfs.pathHDFS のストレージパス。例:/tmp/flume-data/loghub/datetime=%y%m%d/hour=%H。
      default-agent.sinks.k1.hdfs.fileTypeHDFS に保存されるファイルのタイプ。値を DataStream に設定します。
      default-agent.sinks.k1.hdfs.rollIntervalファイルが生成される間隔。単位:秒。例:3600。
      default-agent.sinks.k1.hdfs.roundHDFS のデータが時間でパーティション分割されるかどうかを指定します。タイムスタンプは切り捨てられます。デフォルト値:true。
      default-agent.sinks.k1.hdfs.roundValuedefault-agent.sinks.k1.hdfs.round パラメータが true に設定されている場合、このパラメータと default-agent.sinks.k1.hdfs.roundUnit パラメータを構成する必要があります。

      たとえば、default-agent.sinks.k1.hdfs.roundUnit パラメータが minute に設定され、このパラメータが 60 に設定されている場合、60 分以内に生成されたデータは 1 つのファイルに書き込まれます。つまり、60 分ごとにファイルが生成されます。

      default-agent.sinks.k1.hdfs.roundUnitデータをパーティション分割するために使用される時間単位。デフォルト値:minute。
      default-agent.sinks.k1.hdfs.rollSizeロールをトリガーするファイルサイズ。一時ファイルのサイズがこのパラメータの値に達すると、ロールに基づいて新しいファイルが生成されます。単位:バイト。

      値 0 は、一時ファイルがファイルサイズに基づいてロールされないことを指定します。

      default-agent.sinks.k1.hdfs.rollCountロールをトリガーするイベントの数。イベント数がこのパラメータの値に達すると、一時ファイルがロールされて新しいファイルが生成されます。

      値 0 は、一時ファイルがイベント数に基づいてロールされないことを指定します。

      default-agent.channels.c1.capacityチャネルに保存されるイベントの最大数。例:2000。
      default-agent.channels.c1.transactionCapacity各チャネルがソースから取得するか、シンクにプッシュするイベントの最大数。例:2000。

      オープンソースの Flume に基づいて Flume サービスのパラメータを構成します。詳細については、「Avro Source」、「Taildir Source」、「HDFS Sink」、および「File Channel」をご参照ください。

    4. 構成を保存します。
      1. 左下隅の [保存] をクリックします。
      2. 表示されるダイアログボックスで、実行理由を入力し、[保存] をクリックします。
  2. Flume サービスを開始します。
    1. Flume サービスの [ステータス] タブで、FlumeAgent コンポーネントを見つけ、[アクション] 列で [詳細] > [再起動] を選択します。
    2. 表示されるダイアログボックスで、実行理由を入力し、[OK] をクリックします。
    3. [確認] メッセージで、[OK] をクリックします。
  3. Flume サービスを開始します。
    1. Flume サービスの [ステータス] タブで、[詳細] > [再起動] を選択します。
    2. 表示されるダイアログボックスで、[実行理由] フィールドに理由を入力し、[OK] をクリックします。
    3. [確認] メッセージで、[OK] をクリックします。
    Flume サービスが開始された後、構成された HDFS パスにタイムスタンプに基づいて保存されているログを表示できます。