Fluentd plugin
Product introduction
This output plugin for Fluentd writes collected data to DataHub. The plugin follows the Fluentd output plugin development standards, is easy to install, and lets you write collected data to DataHub.
Product installation
Install using RubyGems
Note: Change the RubyGems source to https://gems.ruby-china.com.
gem install fluent-plugin-datahub
Local installation
-
Fluentd runs only in a Linux environment and requires Ruby to be installed.
-
Two installation modes are available. If you have not installed Fluentd, you can use the one-click installation mode to install both Fluentd and the DataHub plugin. If you have already installed Fluentd, you can use the standalone installation mode to install only the DataHub writer plugin.
1) One-click installation: If you have not installed Fluentd, download the complete Fluentd installation package. Note: The complete installation package includes Fluentd version fluentd-0.12.25.gem.
$ tar -xzvf fluentd-with-datahub-0.12.25.tar.gz
$ cd fluentd-with-datahub
$ sudo sh install.sh
2) Standalone installation: If you have already installed Fluentd, download the Fluentd DataHub plugin package. Use the gem command to install the DataHub plugin.
$ sudo gem install --local fluent-plugin-datahub-0.12.25.gem
Use cases
Use case 1: Upload a CSV file
This use case shows how to use Fluentd to upload incremental CSV files to DataHub in near real-time. The CSV file has the following format:
0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111
In the CSV file, each line is a record with distinct fields. The file is saved to the local path `/temp/test.csv`. The DataHub topic has the following format:
|
Field name |
Field type |
|---|---|
|
id |
BIGINT |
|
name |
STRING |
|
gender |
BOOLEAN |
|
salary |
DOUBLE |
|
my_time |
TIMESTAMP |
Use the following Fluentd configuration. The configuration file is located at `${CONFIG_HOME}/fluentd_test.conf`:
<source>
@type tail
path your_file_path
tag test1
format csv
keys id,name,gender,salary,my_time
</source>
<match test1>
@type datahub
access_id your_app_id
access_key your_app_key
endpoint http://ip:port
project_name test_project
topic_name fluentd_performance_test_1
column_names ["id", "name", "gender", "salary", "my_time"]
flush_interval 1s
buffer_chunk_limit 3m
buffer_queue_limit 128
dirty_data_continue true
dirty_data_file path_to_dirty_data_file
retry_times 3
put_data_batch_size 1000
</match>
Run the following command to start Fluentd and begin sending data from the CSV file to DataHub:
${FLUENTD_HOME}/fluentd-with-dataHub/bin/fluentd -c ${CONFIG_HOME}/fluentd_test.conf
Use case 2: Collect Log4j logs
The Log4j logs have the following format:
11:48:43.439 [qtp1847995714-17] INFO AuditInterceptor - [c2un5sh7cu52ek6am1ui1m5h] end /web/v1/project/tefe4mfurtix9kwwyrvfqd0m/node/0m0169kapshvgc3ujskwkk8g/health GET, 4061 ms
Use the following Fluentd configuration:
<source>
@type tail
path bayes.log
tag test
format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
</source>
<match test>
@type datahub
access_id your_access_id
access_key your_access_key
endpoint http://ip:port
project_name test_project
topic_name dataHub_fluentd_out_1
column_names ["thread_id", "log_level", "class"]
</match>
Start Fluentd with this configuration to send Log4j logs to DataHub.
Configuration parameters
Input plugin configuration
tag test1 : Specifies the route. The route is matched against the <match> regular expression.
format csv : Collects data in CSV format.
keys id,name,gender,salary,my_time : Specifies the columns to collect. The column names must match the column names in the destination DataHub table.Output plugin configuration
shard_id 0 : Specifies the shard ID to write to. By default, data is written in a round-robin manner.
shard_keys ["id"] : Specifies the partition key. The key value is hashed to determine the index of the shard to write to.
flush_interval 1 : Fluentd writes data at least once per second. The default is 60s.
buffer_chunk_limit 3m : The chunk size. Supports "k" (KB) and "m" (MB). The recommended value is 3m.
buffer_queue_limit 128 : The chunk queue size. This value and buffer_chunk_limit determine the total buffer size.
put_data_batch_size 1000 : Writes data to DataHub for every 1,000 records.
retry_times 3 : The number of retries.
retry_interval 3 : The retry interval in seconds.
dirty_data_continue true : Specifies whether to continue when dirty data is encountered. If set to true, Fluentd retries. After all retries are exhausted, the dirty data is written to a dirty data file.
dirty_data_file /xxx/yyy : Specifies the path to the dirty data file.
column_names ["id"] : Specifies the columns to collect.
Performance test
Test environment: Fluentd ran on a 2-core, 4 GB Linux operating system. The performance test results for the DataHub plugin are as follows:
-
For single records of 512 B, the write speed was approximately 2,800 records/s.
-
Increasing `put_data_batch_size` slightly improved the speed, but the effect was not significant.
-
For single records of 100 KB, the plugin worked only when `put_data_batch_size` was set to 100. Values of 500 and 1,000 did not work because the data size for a single write operation exceeded 50 MB.
-
The total average write speed was 3 MB/s.
FAQ
Q: How do I write a regular expression for the format parameter in Fluentd? A: Use this online regular expression editor.