Fluentd プラグイン
製品紹介
この Fluentd 用出力プラグインは、収集したデータを DataHub に書き込みます。このプラグインは Fluentd の出力プラグイン開発標準に準拠しており、インストールが簡単で、収集したデータを DataHub に書き込むことができます。
製品のインストール
RubyGems を使用したインストール
注意:RubyGems のソースを https://gems.ruby-china.com に変更してください。
gem install fluent-plugin-datahub
ローカルインストール
-
Fluentd は Linux 環境でのみ実行され、Ruby のインストールが必要です。
-
インストールモードは 2 種類あります。Fluentd をインストールしていない場合は、ワンクリックインストールモードを使用して Fluentd と DataHub プラグインの両方をインストールできます。すでに Fluentd をインストールしている場合は、スタンドアロンインストールモードを使用して DataHub ライタープラグインのみをインストールできます。
1) ワンクリックインストール:Fluentd をインストールしていない場合は、Fluentd の完全なインストールパッケージをダウンロードしてください。注意:完全なインストールパッケージには、Fluentd バージョン fluentd-0.12.25.gem が含まれています。
$ tar -xzvf fluentd-with-datahub-0.12.25.tar.gz
$ cd fluentd-with-datahub
$ sudo sh install.sh
2) スタンドアロンインストール:すでに Fluentd をインストールしている場合は、Fluentd DataHub プラグインパッケージをダウンロードしてください。gem コマンドを使用して DataHub プラグインをインストールします。
$ sudo gem install --local fluent-plugin-datahub-0.12.25.gem
ユースケース
ユースケース 1:CSV ファイルのアップロード
このユースケースでは、Fluentd を使用して増分 CSV ファイルをほぼリアルタイムで DataHub にアップロードする方法を説明します。CSV ファイルのフォーマットは以下の通りです。
0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111
CSV ファイルでは、各行が個別のフィールドを持つ 1 つのレコードです。ファイルはローカルパス `/temp/test.csv` に保存されます。DataHub の Topic は以下のフォーマットです。
|
フィールド名 |
フィールドタイプ |
|---|---|
|
id |
BIGINT |
|
name |
STRING |
|
gender |
BOOLEAN |
|
salary |
DOUBLE |
|
my_time |
TIMESTAMP |
以下の Fluentd 構成を使用します。構成ファイルは `${CONFIG_HOME}/fluentd_test.conf` にあります。
<source>
@type tail
path your_file_path
tag test1
format csv
keys id,name,gender,salary,my_time
</source>
<match test1>
@type datahub
access_id your_app_id
access_key your_app_key
endpoint http://ip:port
project_name test_project
topic_name fluentd_performance_test_1
column_names ["id", "name", "gender", "salary", "my_time"]
flush_interval 1s
buffer_chunk_limit 3m
buffer_queue_limit 128
dirty_data_continue true
dirty_data_file path_to_dirty_data_file
retry_times 3
put_data_batch_size 1000
</match>
次のコマンドを実行して Fluentd を起動し、CSV ファイルから DataHub へのデータ送信を開始します。
${FLUENTD_HOME}/fluentd-with-dataHub/bin/fluentd -c ${CONFIG_HOME}/fluentd_test.conf
ユースケース 2:Log4j ログの収集
Log4j ログは以下のフォーマットです。
11:48:43.439 [qtp1847995714-17] INFO AuditInterceptor - [c2un5sh7cu52ek6am1ui1m5h] end /web/v1/project/tefe4mfurtix9kwwyrvfqd0m/node/0m0169kapshvgc3ujskwkk8g/health GET, 4061 ms
以下の Fluentd 構成を使用します。
<source>
@type tail
path bayes.log
tag test
format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
</source>
<match test>
@type datahub
access_id your_access_id
access_key your_access_key
endpoint http://ip:port
project_name test_project
topic_name dataHub_fluentd_out_1
column_names ["thread_id", "log_level", "class"]
</match>
この構成で Fluentd を起動し、Log4j ログを DataHub に送信します。
設定パラメーター
入力プラグインの構成
tag test1 : ルートを指定します。このルートは <match> の正規表現と照合されます。
format csv : CSV フォーマットでデータを収集します。
keys id,name,gender,salary,my_time : 収集する列を指定します。列名は送信先の DataHub テーブルの列名と一致する必要があります。出力プラグインの構成
shard_id 0 : 書き込み先のシャード ID を指定します。デフォルトでは、データはラウンドロビン方式で書き込まれます。
shard_keys ["id"] : パーティションキーを指定します。キーの値はハッシュ化され、書き込み先のシャードのインデックスを決定します。
flush_interval 1 : Fluentd は少なくとも 1 秒に 1 回データを書き込みます。デフォルトは 60 秒です。
buffer_chunk_limit 3m : チャンクサイズです。「k」(KB) と「m」(MB) をサポートします。推奨値は 3m です。
buffer_queue_limit 128 : チャンクキューのサイズです。この値と buffer_chunk_limit で合計バッファーサイズが決まります。
put_data_batch_size 1000 : 1,000 レコードごとにデータを DataHub に書き込みます。
retry_times 3 : リトライ回数です。
retry_interval 3 : 再試行間隔 (秒) です。
dirty_data_continue true : ダーティデータが検出された場合に続行するかどうかを指定します。true に設定すると、Fluentd はリトライします。すべてのリトライが失敗した後、ダーティデータはダーティデータファイルに書き込まれます。
dirty_data_file /xxx/yyy : ダーティデータファイルのパスを指定します。
column_names ["id"] : 収集する列を指定します。
性能テスト
テスト環境:Fluentd は 2 コア、4 GB の Linux オペレーティングシステムで実行されました。DataHub プラグインの性能テスト結果は以下の通りです。
-
512 B の単一レコードの場合、書き込み速度は約 2,800 レコード/秒でした。
-
`put_data_batch_size` を増やすと速度はわずかに向上しましたが、効果は限定的でした。
-
100 KB の単一レコードの場合、プラグインは `put_data_batch_size` が 100 に設定されている場合にのみ機能しました。500 や 1,000 の値では、1 回の書き込み操作のデータサイズが 50 MB を超えるため、機能しませんでした。
-
全体の平均書き込み速度は 3 MB/s でした。
よくある質問
Q:Fluentd の format パラメーターの正規表現はどのように記述しますか? A:こちらのオンライン正規表現エディターをご利用ください。