すべてのプロダクト
Search
ドキュメントセンター

DataHub:Fluentd プラグイン

最終更新日:Mar 13, 2026

Fluentd プラグイン

製品紹介

この Fluentd 用出力プラグインは、収集したデータを DataHub に書き込みます。このプラグインは Fluentd の出力プラグイン開発標準に準拠しており、インストールが簡単で、収集したデータを DataHub に書き込むことができます。

製品のインストール

RubyGems を使用したインストール

注意:RubyGems のソースを https://gems.ruby-china.com に変更してください。

gem install fluent-plugin-datahub

ローカルインストール

  1. Fluentd は Linux 環境でのみ実行され、Ruby のインストールが必要です。

  2. インストールモードは 2 種類あります。Fluentd をインストールしていない場合は、ワンクリックインストールモードを使用して Fluentd と DataHub プラグインの両方をインストールできます。すでに Fluentd をインストールしている場合は、スタンドアロンインストールモードを使用して DataHub ライタープラグインのみをインストールできます。

1) ワンクリックインストール:Fluentd をインストールしていない場合は、Fluentd の完全なインストールパッケージをダウンロードしてください。注意:完全なインストールパッケージには、Fluentd バージョン fluentd-0.12.25.gem が含まれています

$ tar -xzvf fluentd-with-datahub-0.12.25.tar.gz
$ cd fluentd-with-datahub
$ sudo sh install.sh

2) スタンドアロンインストール:すでに Fluentd をインストールしている場合は、Fluentd DataHub プラグインパッケージをダウンロードしてください。gem コマンドを使用して DataHub プラグインをインストールします。

$ sudo gem install --local fluent-plugin-datahub-0.12.25.gem

ユースケース

ユースケース 1:CSV ファイルのアップロード

このユースケースでは、Fluentd を使用して増分 CSV ファイルをほぼリアルタイムで DataHub にアップロードする方法を説明します。CSV ファイルのフォーマットは以下の通りです。

0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111

CSV ファイルでは、各行が個別のフィールドを持つ 1 つのレコードです。ファイルはローカルパス `/temp/test.csv` に保存されます。DataHub の Topic は以下のフォーマットです。

フィールド名

フィールドタイプ

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

以下の Fluentd 構成を使用します。構成ファイルは `${CONFIG_HOME}/fluentd_test.conf` にあります。

<source>
  @type tail
  path your_file_path
  tag test1
  format csv
  keys id,name,gender,salary,my_time
</source>
<match test1>
  @type datahub
  access_id your_app_id
  access_key your_app_key
  endpoint http://ip:port
  project_name test_project
  topic_name fluentd_performance_test_1
  column_names ["id", "name", "gender", "salary", "my_time"]
  flush_interval 1s
  buffer_chunk_limit 3m
  buffer_queue_limit 128
  dirty_data_continue true
  dirty_data_file path_to_dirty_data_file
  retry_times 3
  put_data_batch_size 1000
</match>

次のコマンドを実行して Fluentd を起動し、CSV ファイルから DataHub へのデータ送信を開始します。

${FLUENTD_HOME}/fluentd-with-dataHub/bin/fluentd -c ${CONFIG_HOME}/fluentd_test.conf

ユースケース 2:Log4j ログの収集

Log4j ログは以下のフォーマットです。

11:48:43.439 [qtp1847995714-17] INFO  AuditInterceptor - [c2un5sh7cu52ek6am1ui1m5h] end /web/v1/project/tefe4mfurtix9kwwyrvfqd0m/node/0m0169kapshvgc3ujskwkk8g/health GET, 4061 ms

以下の Fluentd 構成を使用します。

 <source>
   @type tail
   path bayes.log
   tag test
   format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
 </source>
 <match test>
   @type datahub
   access_id your_access_id
   access_key your_access_key
   endpoint http://ip:port
   project_name test_project
   topic_name dataHub_fluentd_out_1
   column_names ["thread_id", "log_level", "class"]
 </match>

この構成で Fluentd を起動し、Log4j ログを DataHub に送信します。

設定パラメーター

入力プラグインの構成
tag test1                          : ルートを指定します。このルートは <match> の正規表現と照合されます。
format csv                         : CSV フォーマットでデータを収集します。
keys id,name,gender,salary,my_time : 収集する列を指定します。列名は送信先の DataHub テーブルの列名と一致する必要があります。
出力プラグインの構成
shard_id 0               : 書き込み先のシャード ID を指定します。デフォルトでは、データはラウンドロビン方式で書き込まれます。
shard_keys ["id"]         : パーティションキーを指定します。キーの値はハッシュ化され、書き込み先のシャードのインデックスを決定します。
flush_interval 1         : Fluentd は少なくとも 1 秒に 1 回データを書き込みます。デフォルトは 60 秒です。
buffer_chunk_limit 3m    : チャンクサイズです。「k」(KB) と「m」(MB) をサポートします。推奨値は 3m です。
buffer_queue_limit 128   : チャンクキューのサイズです。この値と buffer_chunk_limit で合計バッファーサイズが決まります。
put_data_batch_size 1000 : 1,000 レコードごとにデータを DataHub に書き込みます。
retry_times 3            : リトライ回数です。
retry_interval  3        : 再試行間隔 (秒) です。
dirty_data_continue true : ダーティデータが検出された場合に続行するかどうかを指定します。true に設定すると、Fluentd はリトライします。すべてのリトライが失敗した後、ダーティデータはダーティデータファイルに書き込まれます。
dirty_data_file /xxx/yyy : ダーティデータファイルのパスを指定します。
column_names ["id"]      : 収集する列を指定します。

性能テスト

テスト環境:Fluentd は 2 コア、4 GB の Linux オペレーティングシステムで実行されました。DataHub プラグインの性能テスト結果は以下の通りです。

  1. 512 B の単一レコードの場合、書き込み速度は約 2,800 レコード/秒でした。

  2. `put_data_batch_size` を増やすと速度はわずかに向上しましたが、効果は限定的でした。

  3. 100 KB の単一レコードの場合、プラグインは `put_data_batch_size` が 100 に設定されている場合にのみ機能しました。500 や 1,000 の値では、1 回の書き込み操作のデータサイズが 50 MB を超えるため、機能しませんでした。

  4. 全体の平均書き込み速度は 3 MB/s でした。

    よくある質問

    Q:Fluentd の format パラメーターの正規表現はどのように記述しますか? A:こちらのオンライン正規表現エディターをご利用ください。