All Products
Search
Document Center

DataHub:Fluentd plug-in

Last Updated:Mar 12, 2026

Fluentd plugin

Product introduction

This output plugin for Fluentd writes collected data to DataHub. The plugin follows the Fluentd output plugin development standards, is easy to install, and lets you write collected data to DataHub.

Product installation

Install using RubyGems

Note: Change the RubyGems source to https://gems.ruby-china.com.

gem install fluent-plugin-datahub

Local installation

  1. Fluentd runs only in a Linux environment and requires Ruby to be installed.

  2. Two installation modes are available. If you have not installed Fluentd, you can use the one-click installation mode to install both Fluentd and the DataHub plugin. If you have already installed Fluentd, you can use the standalone installation mode to install only the DataHub writer plugin.

1) One-click installation: If you have not installed Fluentd, download the complete Fluentd installation package. Note: The complete installation package includes Fluentd version fluentd-0.12.25.gem.

$ tar -xzvf fluentd-with-datahub-0.12.25.tar.gz
$ cd fluentd-with-datahub
$ sudo sh install.sh

2) Standalone installation: If you have already installed Fluentd, download the Fluentd DataHub plugin package. Use the gem command to install the DataHub plugin.

$ sudo gem install --local fluent-plugin-datahub-0.12.25.gem

Use cases

Use case 1: Upload a CSV file

This use case shows how to use Fluentd to upload incremental CSV files to DataHub in near real-time. The CSV file has the following format:

0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111

In the CSV file, each line is a record with distinct fields. The file is saved to the local path `/temp/test.csv`. The DataHub topic has the following format:

Field name

Field type

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

Use the following Fluentd configuration. The configuration file is located at `${CONFIG_HOME}/fluentd_test.conf`:

<source>
  @type tail
  path your_file_path
  tag test1
  format csv
  keys id,name,gender,salary,my_time
</source>
<match test1>
  @type datahub
  access_id your_app_id
  access_key your_app_key
  endpoint http://ip:port
  project_name test_project
  topic_name fluentd_performance_test_1
  column_names ["id", "name", "gender", "salary", "my_time"]
  flush_interval 1s
  buffer_chunk_limit 3m
  buffer_queue_limit 128
  dirty_data_continue true
  dirty_data_file path_to_dirty_data_file
  retry_times 3
  put_data_batch_size 1000
</match>

Run the following command to start Fluentd and begin sending data from the CSV file to DataHub:

${FLUENTD_HOME}/fluentd-with-dataHub/bin/fluentd -c ${CONFIG_HOME}/fluentd_test.conf

Use case 2: Collect Log4j logs

The Log4j logs have the following format:

11:48:43.439 [qtp1847995714-17] INFO  AuditInterceptor - [c2un5sh7cu52ek6am1ui1m5h] end /web/v1/project/tefe4mfurtix9kwwyrvfqd0m/node/0m0169kapshvgc3ujskwkk8g/health GET, 4061 ms

Use the following Fluentd configuration:

 <source>
   @type tail
   path bayes.log
   tag test
   format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
 </source>
 <match test>
   @type datahub
   access_id your_access_id
   access_key your_access_key
   endpoint http://ip:port
   project_name test_project
   topic_name dataHub_fluentd_out_1
   column_names ["thread_id", "log_level", "class"]
 </match>

Start Fluentd with this configuration to send Log4j logs to DataHub.

Configuration parameters

Input plugin configuration
tag test1                          : Specifies the route. The route is matched against the <match> regular expression.
format csv                         : Collects data in CSV format.
keys id,name,gender,salary,my_time : Specifies the columns to collect. The column names must match the column names in the destination DataHub table.
Output plugin configuration
shard_id 0               : Specifies the shard ID to write to. By default, data is written in a round-robin manner.
shard_keys ["id"]         : Specifies the partition key. The key value is hashed to determine the index of the shard to write to.
flush_interval 1         : Fluentd writes data at least once per second. The default is 60s.
buffer_chunk_limit 3m    : The chunk size. Supports "k" (KB) and "m" (MB). The recommended value is 3m.
buffer_queue_limit 128   : The chunk queue size. This value and buffer_chunk_limit determine the total buffer size.
put_data_batch_size 1000 : Writes data to DataHub for every 1,000 records.
retry_times 3            : The number of retries.
retry_interval  3        : The retry interval in seconds.
dirty_data_continue true : Specifies whether to continue when dirty data is encountered. If set to true, Fluentd retries. After all retries are exhausted, the dirty data is written to a dirty data file.
dirty_data_file /xxx/yyy : Specifies the path to the dirty data file.
column_names ["id"]      : Specifies the columns to collect.

Performance test

Test environment: Fluentd ran on a 2-core, 4 GB Linux operating system. The performance test results for the DataHub plugin are as follows:

  1. For single records of 512 B, the write speed was approximately 2,800 records/s.

  2. Increasing `put_data_batch_size` slightly improved the speed, but the effect was not significant.

  3. For single records of 100 KB, the plugin worked only when `put_data_batch_size` was set to 100. Values of 500 and 1,000 did not work because the data size for a single write operation exceeded 50 MB.

  4. The total average write speed was 3 MB/s.

    FAQ

    Q: How do I write a regular expression for the format parameter in Fluentd? A: Use this online regular expression editor.