このトピックでは、E-MapReduce(EMR)の Flume サービスを使用して、ログサービスの LogHub から EMR クラスタの Hadoop 分散ファイルシステム(HDFS)にデータをリアルタイムで同期する方法について説明します。同期されたデータは、タイムスタンプに基づいて HDFS パーティションに自動的に保存されます。
背景情報
ログサービスの Logtail を使用して、同期するデータを収集し、リアルタイムで LogHub にアップロードできます。次に、EMR クラスタの Flume サービスを使用して、LogHub から EMR クラスタの HDFS にデータを同期します。
LogHub へのデータのアップロード方法の詳細については、「データ収集の概要」をご参照ください。
前提条件
EMR データレイククラスタが作成され、クラスタ作成時にオプションサービスから Flume が選択されています。クラスタの作成方法の詳細については、「クラスタの作成」をご参照ください。
手順
- Flume サービスを構成します。
- Flume サービスの [構成] タブに移動します。
- 上部のナビゲーションバーで、クラスタが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します。
- [ECS 上の EMR] ページで、管理するクラスタを見つけ、[アクション] 列の [サービス] をクリックします。
- [サービス] タブで、構成[flume] サービスセクションの をクリックします。
- [構成] タブで、[flume-conf.properties] サブタブをクリックします。この例では、グローバル構成が使用されています。ノードごとにクラスタを構成する場合は、独立ノード構成構成[flume] サービスの サブタブのドロップダウンリストから を選択できます。
- [flume-conf.properties] 構成項目の値に次の内容を追加します。
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.loghub.LogHubSource default-agent.sources.source1.endpoint = <yourLogHubEndpoint> default-agent.sources.source1.project = canaltest default-agent.sources.source1.logstore = canal default-agent.sources.source1.accessKeyId = yHiu*******BG2s default-agent.sources.source1.accessKey = ABctuw0M***************iKKljZy default-agent.sources.source1.useRecordTime = true default-agent.sources.source1.consumerGroup = consumer_1 default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H default-agent.sinks.k1.hdfs.fileType = DataStream default-agent.sinks.k1.hdfs.rollInterval = 3600 default-agent.sinks.k1.hdfs.round = true default-agent.sinks.k1.hdfs.roundValue = 60 default-agent.sinks.k1.hdfs.roundUnit = minute default-agent.sinks.k1.hdfs.rollSize = 0 default-agent.sinks.k1.hdfs.rollCount = 0 # メモリ内にイベントをバッファリングするチャネルを使用します default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 2000 default-agent.channels.c1.transactionCapacity = 2000 # ソースとシンクをチャネルにバインドします default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1パラメータ 説明 default-agent.sources.source1.type 値を org.apache.flume.source.loghub.LogHubSource に設定します。 default-agent.sources.source1.endpoint LogHub にアクセスするために使用されるエンドポイント。 説明 リージョンのログサービスの仮想プライベートクラウド(VPC)またはクラシックネットワークエンドポイントを使用する場合は、EMR クラスタが同じリージョンに存在することを確認してください。パブリックエンドポイントを使用する場合は、Flume エージェントが実行されているノードにパブリック IP アドレスが割り当てられていることを確認してください。default-agent.sources.source1.project ログサービスプロジェクトの名前。 default-agent.sources.source1.logstore ログストアの名前。 default-agent.sources.source1.accessKeyId Alibaba Cloud アカウントの AccessKey ID。 default-agent.sources.source1.accessKey Alibaba Cloud アカウントの AccessKey シークレット。 default-agent.sources.source1.useRecordTime このパラメータを true に設定します。 デフォルト値:false。ヘッダーに timestamp プロパティが含まれていない場合、イベントが受信された時刻がタイムスタンプとしてエンコードされ、タイムスタンプがヘッダーに挿入されます。 Flume エージェントが起動または停止した場合、またはデータ同期が遅延した場合、データは間違ったパーティションに配置されます。この問題を防ぐには、このパラメータを true に設定します。このようにして、LogHub がデータを収集した時刻がタイムスタンプとして使用されます。
default-agent.sources.source1.consumerGroup コンシューマーグループの名前。デフォルト値:consumer_1。 default-agent.sources.source1.consumerPosition コンシューマーグループが初めて LogHub データを消費する位置。デフォルト値:end。end の値は、最新のデータから消費が開始されることを指定します。 - begin:消費は最も古いデータから開始されます。
- special:特定の時点から消費が開始されます。
このパラメータを special に設定する場合は、startTime パラメータを特定の時点に設定する必要があります。単位:秒。
default-agent.sources.source1.heartbeatInterval コンシューマーグループがサーバーにハートビートを送信する間隔。単位:ミリ秒。デフォルト値:30000。 default-agent.sources.source1.fetchInOrder コンシューマーグループが同じキーを持つデータを順番に消費するかどうかを指定します。デフォルト値:false。 default-agent.sources.source1.batchSize 一度にチャネルに書き込むことができるメッセージの最大数。これは一般的なソースバッチ構成です。 default-agent.sources.source1.batchDurationMillis メッセージが一度にチャネルに書き込まれるまで待機する最大ミリ秒数。これは一般的なソースバッチ構成です。 default-agent.sources.source1.backoffSleepIncrement LogHub にデータがない場合にスリープをトリガーする初期および増分待機時間。これは一般的なソーススリープ構成です。 default-agent.sources.source1.maxBackoffSleep LogHub にデータがない場合にスリープをトリガーする最大待機時間。これは一般的なソーススリープ構成です。 default-agent.sinks.k1.hdfs.path HDFS のストレージパス。例:/tmp/flume-data/loghub/datetime=%y%m%d/hour=%H。 default-agent.sinks.k1.hdfs.fileType HDFS に保存されるファイルのタイプ。値を DataStream に設定します。 default-agent.sinks.k1.hdfs.rollInterval ファイルが生成される間隔。単位:秒。例:3600。 default-agent.sinks.k1.hdfs.round HDFS のデータが時間でパーティション分割されるかどうかを指定します。タイムスタンプは切り捨てられます。デフォルト値:true。 default-agent.sinks.k1.hdfs.roundValue default-agent.sinks.k1.hdfs.round パラメータが true に設定されている場合、このパラメータと default-agent.sinks.k1.hdfs.roundUnit パラメータを構成する必要があります。 たとえば、default-agent.sinks.k1.hdfs.roundUnit パラメータが minute に設定され、このパラメータが 60 に設定されている場合、60 分以内に生成されたデータは 1 つのファイルに書き込まれます。つまり、60 分ごとにファイルが生成されます。
default-agent.sinks.k1.hdfs.roundUnit データをパーティション分割するために使用される時間単位。デフォルト値:minute。 default-agent.sinks.k1.hdfs.rollSize ロールをトリガーするファイルサイズ。一時ファイルのサイズがこのパラメータの値に達すると、ロールに基づいて新しいファイルが生成されます。単位:バイト。 値 0 は、一時ファイルがファイルサイズに基づいてロールされないことを指定します。
default-agent.sinks.k1.hdfs.rollCount ロールをトリガーするイベントの数。イベント数がこのパラメータの値に達すると、一時ファイルがロールされて新しいファイルが生成されます。 値 0 は、一時ファイルがイベント数に基づいてロールされないことを指定します。
default-agent.channels.c1.capacity チャネルに保存されるイベントの最大数。例:2000。 default-agent.channels.c1.transactionCapacity 各チャネルがソースから取得するか、シンクにプッシュするイベントの最大数。例:2000。 オープンソースの Flume に基づいて Flume サービスのパラメータを構成します。詳細については、「Avro Source」、「Taildir Source」、「HDFS Sink」、および「File Channel」をご参照ください。
- 構成を保存します。
- 左下隅の [保存] をクリックします。
- 表示されるダイアログボックスで、実行理由を入力し、[保存] をクリックします。
- Flume サービスの [構成] タブに移動します。
- Flume サービスを開始します。
- Flume サービスの [ステータス] タブで、FlumeAgent コンポーネントを見つけ、[アクション] 列で を選択します。
- 表示されるダイアログボックスで、実行理由を入力し、[OK] をクリックします。
- [確認] メッセージで、[OK] をクリックします。
- Flume サービスを開始します。
- Flume サービスの [ステータス] タブで、 を選択します。
- 表示されるダイアログボックスで、[実行理由] フィールドに理由を入力し、[OK] をクリックします。
- [確認] メッセージで、[OK] をクリックします。
Flume サービスが開始された後、構成された HDFS パスにタイムスタンプに基づいて保存されているログを表示できます。