All Products
Search
Document Center

DataHub:DataHub plug-in for Fluentd

Last Updated:May 20, 2022

DataHub plug-in for Fluentd

Overview

The DataHub plug-in for Fluentd, an output plug-in developed based on Fluentd, writes collected data to DataHub. The plug-in complies with the development conventions of Fluentd output plug-ins and is easy to install. You can use the plug-in to write the collected data to DataHub with ease.

Install the DataHub plug-in for Fluentd

Install the plug-in by using RubyGems

We recommend that you change the gem source to https://gems.ruby-china.com

gem install fluent-plugin-datahub

Install the plug-in by using an installation package

  1. The plug-in must be installed in Linux. Before you install the plug-in, install Ruby.

  2. For users who have not installed Fluentd, a full installation package for installing both Fluentd and the DataHub plug-in for Fluentd is provided. For users who have installed Fluentd, an installation package of the DataHub plug-in for Fluentd is provided.

(1) Install both Fluentd and the DataHub plug-in for Fluentd: If you have not installed Fluentd, download the full installation package for installing both Fluentd and the DataHub plug-in for Fluentd. Note that Fluentd 0.12.25 is provided in the full installation package.

$ tar -xzvf fluentd-with-datahub-0.12.25.tar.gz
$ cd fluentd-with-datahub
$ sudo sh install.sh

(2) Separately install the DataHub plug-in for Fluentd: If you have installed Fluentd, download the installation package of the DataHub plug-in for Fluentd and run the gem command to install the plug-in.

$ sudo gem install --local fluent-plugin-datahub-0.12.25.gem

Use cases

Case 1: Upload a CSV file

This section describes how to write the incremental content of a CSV file to DataHub in quasi-real time by using the DataHub plug-in for Fluentd. The following example shows the content format of the CSV file.

0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111

Each line is a record to be written to DataHub. Fields are separated by commas (,). The CSV file is saved as /temp/test.csv on the on-premises computer. The following table describes the schema of the DataHub topic to which the CSV file is written.

Field name

Data type

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

The following Fluentd configuration file is used in this example. The configuration file is saved as ${CONFIG_HOME}/fluentd_test.conf.

<source>
  @type tail
  path The path of the CSV file.
  tag test1
  format csv
  keys id,name,gender,salary,my_time
</source>
<match test1>
  @type datahub
  access_id your_app_id
  access_key your_app_key
  endpoint http://ip:port
  project_name test_project
  topic_name fluentd_performance_test_1
  column_names ["id", "name", "gender", "salary", "my_time"]
  flush_interval 1s
  buffer_chunk_limit 3m
  buffer_queue_limit 128
  dirty_data_continue true
  dirty_data_file The path of the dirty record file.
  retry_times 3
  put_data_batch_size 1000
</match>

Run the following command to start Fluentd to write the CSV file to DataHub:

${FLUENTD_HOME}/fluentd-with-dataHub/bin/fluentd -c ${CONFIG_HOME}/fluentd_test.conf

Case 2: Collect Log4j logs

The following code shows a sample Log4j log:

11:48:43.439 [qtp1847995714-17] INFO  AuditInterceptor - [c2un5sh7cu52ek6am1ui1m5h] end /web/v1/project/tefe4mfurtix9kwwyrvfqd0m/node/0m0169kapshvgc3ujskwkk8g/health GET, 4061 ms

The following Fluentd configuration file is used in this example:

 <source>
   @type tail
   path bayes.log
   tag test
   format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
 </source>
 <match test>
   @type datahub
   access_id your_access_id
   access_key your_access_key
   endpoint http://ip:port
   project_name test_project
   topic_name dataHub_fluentd_out_1
   column_names ["thread_id", "log_level", "class"]
 </match>

Use the preceding configuration file to start Fluentd to collect Log4j logs to DataHub.

Parameters

Input configuration
tag test1: the tag, which is mapped to the destination information by using the specified regular expression.
format csv: the format of the file from which data is collected.
keys id,name,gender,salary,my_time: the fields to be collected from the CSV file. The field names must be the same as those in the schema of the destination DataHub topic.
Output configuration
shard_id 0: the ID of the shard to which all records are written. By default, all records are written to the shard by polling.
shard_keys ["id"]: the field used as the shard key. Hashed shard key values are used as indexes for writing data.
flush_interval 1: the interval between data flushes. Default value: 60s.
buffer_chunk_limit 3m: the maximum size of a chunk. Unit: k or m, which indicates KB or MB. We recommend you set the maximum size to 3 MB.
buffer_queue_limit 128: the maximum length of the chunk queue. Both the buffer_chunk_limit and buffer_queue_limit parameters determine the size of the buffer.
put_data_batch_size 1000: the number of records to be written to DataHub at a time. In this example, 1,000 records are written to DataHub each time.
retry_times 3: the number of retries.
retry_interval 3: the interval between retries. Unit: seconds.
dirty_data_continue true: specifies whether to ignore dirty records. A value of true indicates that the plug-in retries the operation for a specified number of times before it writes the dirty records to the dirty record file.
dirty_data_file /xxx/yyy: the directory where the dirty record file is stored.
column_names ["id"]: the name of the fields to be written to DataHub.

Performance testing

Environment for performance testing: Fluentd runs in Linux with a dual-core CPU and 4 GB memory. The following points can be observed from the performance testing data:

  1. For a single record of 512 bytes in size, the write speed is kept at about 2,800 records per second.

  2. As the number of records to be written to DataHub at a time increases, the write speed slightly increases.

  3. For a single record of 100 KB in size, the plug-in can work only when 100 records are written to DataHub at a time. The plug-in does not work when 500 or 1,000 records are written to DataHub at a time because the amount of the data written to DataHub at a time is too large. The size of 500 or 1,000 records is greater than 50 MB.

  4. The average write speed remains at 3 MB/s.

    FAQ

    Q: How do I write regular expressions for Fluentd? A: You can use the regular expression editor.