すべてのプロダクト
Search
ドキュメントセンター

DataHub:DataHub Fluentdプラグイン

最終更新日:Jan 12, 2025

DataHub Fluentdプラグイン

概要

Fluentd向けDataHubプラグインは、Fluentdに基づいて開発された出力プラグインであり、収集されたデータをDataHubに書き込みます。このプラグインは、Fluentd出力プラグインの開発規則に準拠しており、簡単にインストールできます。このプラグインを使用すると、収集されたデータをDataHubに簡単に書き込むことができます。

DataHub Fluentdプラグインのインストール

RubyGemsを使用してプラグインをインストールする

gemソースを https://gems.ruby-china.com に変更することをお勧めします。

gem install fluent-plugin-datahub

インストールパッケージを使用してプラグインをインストールする

  1. プラグインはLinuxにインストールする必要があります。プラグインをインストールする前に、Rubyをインストールしてください。

  2. Fluentdをインストールしていないユーザー向けに、FluentdとDataHub Fluentdプラグインの両方をインストールするための完全なインストールパッケージが提供されています。Fluentdをインストール済みのユーザー向けに、DataHub Fluentdプラグインのインストールパッケージが提供されています。

(1) FluentdとDataHub Fluentdプラグインの両方をインストールする: Fluentdをインストールしていない場合は、FluentdとDataHub Fluentdプラグインの両方をインストールするための完全なインストールパッケージをダウンロードしてください。完全なインストールパッケージにはFluentd 0.12.25が含まれています

$ tar -xzvf fluentd-with-datahub-0.12.25.tar.gz
$ cd fluentd-with-datahub
$ sudo sh install.sh

(2) DataHub Fluentdプラグインを個別にインストールする: Fluentdをインストール済みの場合は、DataHub Fluentdプラグインのインストールパッケージをダウンロードし、gemコマンドを実行してプラグインをインストールします。

$ sudo gem install --local fluent-plugin-datahub-0.12.25.gem

ユースケース

ケース 1:CSV ファイルをアップロードする

このセクションでは、Fluentd向けDataHubプラグインを使用して、CSVファイルの増分コンテンツを準リアルタイムでDataHubに書き込む方法について説明します。次の例は、CSVファイルのコンテンツ形式を示しています。

0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111

各行はDataHubに書き込まれるレコードです。フィールドはカンマ(,)で区切られています。 CSV ファイルは、オンプレミスコンピューターの /temp/test.csv として保存されます。次の表は、CSV ファイルの書き込み先の DataHub トピックのスキーマを示しています。

フィールド名

データ型

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

この例では、次の Fluentd 構成ファイルを使用します。構成ファイルは ${CONFIG_HOME}/fluentd_test.conf として保存されます。

<source>
  @type tail
  path CSVファイルのパス
  tag test1
  format csv
  keys id,name,gender,salary,my_time
</source>
<match test1>
  @type datahub
  access_id your_app_id  // アプリケーションID
  access_key your_app_key  // アプリケーションキー
  endpoint http://ip:port
  project_name test_project
  topic_name fluentd_performance_test_1
  column_names ["id", "name", "gender", "salary", "my_time"]
  flush_interval 1s
  buffer_chunk_limit 3m
  buffer_queue_limit 128
  dirty_data_continue true
  dirty_data_file 無効なレコードファイルのパス
  retry_times 3
  put_data_batch_size 1000
</match>

次のコマンドを実行して Fluentd を起動し、CSV ファイルを DataHub に書き込みます。

${FLUENTD_HOME}/fluentd-with-dataHub/bin/fluentd -c ${CONFIG_HOME}/fluentd_test.conf

ケース 2:Log4j ログを収集する

次のコードは、Log4j ログの例を示しています。

11:48:43.439 [qtp1847995714-17] INFO  AuditInterceptor - [c2un5sh7cu52ek6am1ui1m5h] end /web/v1/project/tefe4mfurtix9kwwyrvfqd0m/node/0m0169kapshvgc3ujskwkk8g/health GET, 4061 ms

この例では、次の Fluentd 構成ファイルを使用します。

 <source>
   @type tail
   path bayes.log
   tag test
   format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
 </source>
 <match test>
   @type datahub
   access_id your_access_id // アクセスID
   access_key your_access_key // アクセスキー
   endpoint http://ip:port
   project_name test_project
   topic_name dataHub_fluentd_out_1
   column_names ["thread_id", "log_level", "class"]
 </match>

上記の構成ファイルを使用して Fluentd を起動し、Log4j ログを DataHub に収集します。

パラメーター

入力設定
tag test1: タグ。指定された正規表現を使用して宛先情報にマッピングされます。
format csv: データが収集されるファイルの形式。
keys id,name,gender,salary,my_time: CSV ファイルから収集されるフィールド。フィールド名は、宛先 DataHub トピックのスキーマのフィールド名と同じである必要があります。
出力設定
shard_id 0: すべてのレコードが書き込まれるシャードの ID。デフォルトでは、すべてのレコードはポーリングによってシャードに書き込まれます。
shard_keys ["id"]: シャードキーとして使用されるフィールド。ハッシュ化されたシャードキー値は、データ書き込みのインデックスとして使用されます。
flush_interval 1: データフラッシュの間隔。デフォルト値: 60 秒。
buffer_chunk_limit 3m: チャンクの最大サイズ。単位: k または m。KB または MB を示します。最大サイズを 3 MB に設定することをお勧めします。
buffer_queue_limit 128: チャンクキューの最大長。buffer_chunk_limit パラメーターと buffer_queue_limit パラメーターの両方がバッファーのサイズを決定します。
put_data_batch_size 1000: 一度に DataHub に書き込まれるレコードの数。この例では、一度に 1,000 レコードが DataHub に書き込まれます。
retry_times 3: 再試行回数。
retry_interval 3: 再試行間隔。単位: 秒。
dirty_data_continue true: 無効なレコードを無視するかどうかを指定します。true の値は、プラグインが無効なレコードを無効なレコードファイルに書き込む前に、指定された回数だけ操作を再試行することを示します。
dirty_data_file /xxx/yyy: 無効なレコードファイルが保存されるディレクトリ。
column_names ["id"]: DataHub に書き込まれるフィールドの名前。

パフォーマンステスト

パフォーマンステストの環境: Fluentd は、デュアルコア CPU と 4 GB メモリを搭載した Linux で実行されます。パフォーマンステストデータから次の点がわかります。

  1. サイズが 512 バイトの単一レコードの場合、書き込み速度は約 2,800 レコード/秒に維持されます。

  2. 一度に DataHub に書き込まれるレコードの数が増加すると、書き込み速度はわずかに増加します。

  3. サイズが 100 KB の単一レコードの場合、プラグインは 100 レコードを一度に DataHub に書き込む場合にのみ機能します。一度に DataHub に書き込まれるデータ量が大きすぎるため、500 または 1,000 レコードを一度に DataHub に書き込む場合、プラグインは機能しません。 500 または 1,000 レコードのサイズは 50 MB を超えています。

  4. 平均書き込み速度は 3 MB/秒のままです。

    FAQ

    Q: Fluentd の正規表現をどのように記述しますか? A: 正規表現エディターを使用できます。