DataHub plug-in for Flume
Overview
The DataHub plug-in for Flume is a DataHub plug-in for data subscription and publishing. It is developed based on Apache Flume. The plug-in can write the collected data to DataHub. The plug-in can also read data from DataHub and then write the data to other systems. The plug-in complies with the development conventions of Apache Flume plug-ins and is easy to install. You can use the plug-in to publish data to DataHub and subscribe to the data in DataHub.
Install Apache Flume and the DataHub plug-in for Flume
Environment requirements
Java Development Kit (JDK) V1.8 or later
Apache Maven 3.x
Flume-NG 1.x
Download Apache Flume
Download Apache Flume. Skip this step if you have downloaded Apache Flume.
$ tar zxvf apache-flume-1.9.0-bin.tar.gz
For convenience, ${FLUME_HOME} is used to indicate the home directory of Apache Flume in the following description.
Install the DataHub plug-in for Flume
The plug-in must be installed in Linux.
Use the installation package
Download the latest package of the DataHub plug-in for Flume.To download the earlier versions of the plug-in, see the "Download earlier versions" section of this topic.
Extract the DataHub plug-in for Flume from the package and place the plug-in in the ${FLUME_HOME}/plugins.d directory.
$ tar aliyun-flume-datahub-sink-x.x.x.tar.gz $ cd aliyun-flume-datahub-sink-x.x.x $ mkdir ${FLUME_HOME}/plugins.d $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
Use the source code
Download the source code of the DataHub plug-in for Flume from GitHub.
Compile the code to install the plug-in.
$ cd aliyun-maxcompute-data-collectors $ mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true $ cd flume-plugin/target $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
Parameters
Parameters in a sink
Parameter | Default value | Required | Description |
---|---|---|---|
datahub.endPoint | - | Yes | The endpoint of DataHub. |
datahub.accessId | - | Yes | The AccessKey ID used to access DataHub. |
datahub.accessKey | - | Yes | The AccessKey secret used to access DataHub. |
datahub.project | - | Yes | The name of the DataHub project. |
datahub.topic | - | Yes | The name of the DataHub topic. |
datahub.shard.ids | IDs of all shards | No | The shards in DataHub to which data is to be written. Separate multiple IDs with commas (,), such as 0,1,2. Each time a shard is randomly selected from the specified shard list and then data is written to the shard in DataHub. If this parameter is not set and shards are merged or split, the plug-in automatically adjusts the shard list. You can also modify the configuration file to manually adjust the shard list. |
datahub.enablePb | true | No | Specifies whether to enable Protocol Buffers (Protobuf) for data transfer. If Protobuf is not supported when you use Apsara Stack, you must set this parameter to false. |
datahub.compressType | none | No | Specifies whether to enable compression for data transfer. The LZ4 and DEFLATE compression formats are supported. |
datahub.batchSize | 1000 | No | The maximum number of records that can be written to DataHub at a time. |
datahub.maxBufferSize | 2*1024*1024 | No | The maximum amount of data that can be written to DataHub at a time. Unit: byte. We recommend that you do not modify this parameter. If the amount of data to be written at a time is large, the write operation may fail. |
datahub.batchTimeout | 5 | No | The time to wait before data is written to DataHub. Unit: seconds. This parameter takes effect only when the data amount does not reach the threshold specified by the batchSize parameter. |
datahub.retryTimes | 3 | No | The maximum number of retries allowed after a write failure. |
datahub.retryInterval | 5 | No | The interval between two consecutive retries after a data write failure. Unit: seconds. |
datahub.dirtyDataContinue | true | No | Specifies whether to ignore dirty records. If you set this parameter to true, a dirty record is automatically written after the delimiter, a comma (,), to the file that stores dirty records. This does not affect subsequent data processing. |
datahub.dirtyDataFile | DataHub-Flume-dirty-file | No | The name of the file that stores dirty records. |
serializer | - | Yes | The data parsing method. Valid values: DELIMITED, JSON, and REGEX. If DELIMITED is selected, the data is parsed based on the specified delimiter. If JSON is selected, each row is parsed as a single-level JSON array. If REGEX is selected, the data is parsed based on the specified regular expression. |
serializer.delimiter | , | No | The delimiter that is used to separate fields. If you want to use special characters, you must enclose the characters in double quotation marks (""), such as "\t". |
serializer.regex | (.*) | No | The regular expression used for data parsing. The values of each field are parsed into a group. |
serializer.fieldnames | - | Yes | The mappings between the input fields and DataHub fields. The input fields are marked based on the input order. If you want an input field not to be mapped, skip this field. For example, c1,c2,,c3 indicates that the first, second, and fourth input fields are mapped to the c1,c2, and c3 fields in DataHub. |
serializer.charset | UTF-8 | No | The encoding format for data parsing. |
Parameters in a source
Parameter | Default value | Required | Description |
---|---|---|---|
datahub.endPoint | - | Yes | The endpoint of DataHub. |
datahub.accessId | - | Yes | The AccessKey ID used to access DataHub. |
datahub.accessKey | - | Yes | The AccessKey secret used to access DataHub. |
datahub.project | - | Yes | The name of the DataHub project. |
datahub.topic | - | Yes | The name of the DataHub topic. |
datahub.subId | - | Yes | The ID of the subscription to DataHub. |
datahub.startTime | - | No | The point in time from which data is to be read from DataHub. Specify the time in the format of yyyy-MM-dd HH:mm:ss. If you set this parameter, the subscription offset is reset, and then data is read based on the subscription. |
datahub.shard.ids | - | No | The shards in DataHub from which data is to be read. Separate multiple IDs with commas (,). Example: 0,1,2. Each time a shard is randomly selected from the specified shard list, and then data is read from the shard in DataHub. If you do not set this parameter, collaborative consumption is enabled for data read operations. We recommend that you do not set this parameter. If multiple sources are configured, and you do not set this parameter, collaborative consumption is enabled to automatically allocate shards to ensure that each source has balanced load. |
datahub.enablePb | true | No | Specifies whether to enable Protobuf for data transfer. If Protobuf is not supported when you use Apsara Stack, you must set this parameter to false. |
datahub.compressType | none | No | Specifies whether to enable compression for data transfer. The LZ4 and DEFLATE compression formats are supported. |
datahub.batchSize | 1000 | No | The maximum number of records that can be read from DataHub at a time. |
datahub.batchTimeout | 5 | No | The time to wait before data is read from DataHub. Unit: seconds. This parameter takes effect only when the data amount does not reach the threshold specified by the batchSize parameter. |
datahub.retryTimes | 3 | No | The maximum number of retries allowed after a read failure. By default, the interval between two consecutive retries is 1 second and cannot be changed. |
datahub.autoCommit | true | No | Specifies whether the consumer automatically submits the consumption offset. If you set this parameter to true, the consumer automatically submits the consumption offset. In this case, the data may not be consumed but the consumption offset is submitted. If you set this parameter to false, the consumption offset is submitted after the data is submitted to the Flume channel. |
datahub.offsetCommitTimeout | 30 | No | The intervals at which the consumer automatically submits the consumption offset. Unit: seconds. |
datahub.sessionTimeout | 60 | No | The session timeout period. Sources work in collaborative consumption mode. If collaborative consumption times out and no heartbeat message is sent, the session is automatically disabled. |
serializer | - | Yes | The data parsing method. Set the value to DELIMITED. The fields in a row are in the order specified by the DataHub schema and are separated by the specified delimiter. |
serializer.delimiter | , | No | The delimiter that is used to separate fields. If you want to use special characters, you must enclose the characters in double quotation marks (""), such as "\t". |
serializer.charset | UTF-8 | No | The encoding format for data parsing. |
Use cases of sinks
Case 1: DELIMITED serializer
If you select DELIMITED as the data parsing method, each row is regarded as a record, and the data is parsed based on the specified delimiter. This case shows you how to use the DataHub plug-in for Flume to write a CSV file that contains multiple records to DataHub in near real time.
Sample data
Save the following data as the test.csv file in the local directory /temp/:
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289
Schema of a DataHub topic
The following table describes the schema of the DataHub topic that corresponds to the preceding data.
Field name | Data type |
---|---|
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
Plug-in configuration file
Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide. Note: Data may be lost when an exec source is used because the exec source cannot ensure that the source event is submitted to the Flume channel. For example, when you run the tail command to collect specific data, and the Flume channel is full, the collected data will be lost. We recommend that you use a spooling directory source or a taildir source. Case 3 describes a taildir source. The static file test.csv in the /temp/ directory is used as the data source in this case. If log data is dynamically written to the file, you can run the tail -F logFile
command to collect log data in real time.
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.csv
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Start the plug-in
The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:
$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
Case 2: REGEX serializer
If you select REGEX as the data parsing method, each row is regarded as a record, and the data is parsed based on the specified regular expression. The content of a record is expressed in multiple groups. This case shows you how the DataHub plug-in for Flume uses a regular expression to write a CSV file to DataHub in near real time.
Sample data
Save the following data as the test.csv file in the local directory /temp/. The data to be synchronized is the content after the dates.
1. [2019-11-12 15:20:08] 0,j4M6PhzL1DXVTQawdfk306N2KnCDxtR0KK1pke5O,true,1254409.5059812006,1573543208698,1254409.5059819978
2. [2019-11-12 15:22:35] 0,mYLF8UzIYCCFUm1jYs9wzd2Hl6IMr2N7GPYXZSZy,true,1254409.5645912462,1573543355740,1254409.5645920434
3. [2019-11-12 15:23:14] 0,MOemUZur37n4SGtdUQyMohgmM6cxZRBXjJ34HzqX,true,1254409.5799291395,1573543394219,1254409.579929538
4. [2019-11-12 15:23:30] 0,EAFc1VTOvC9rYzPl9zJYa6cc8uJ089EaFd79B25i,true,1254409.5862723626,1573543410134,1254409.5862731598
5. [2019-11-12 15:23:53] 0,zndVraA4GP7FP8p4CkQFsKJkxwtYK3zXjDdkhmRk,true,1254409.5956010541,1573543433538,1254409.5956018514
6. [2019-11-12 15:24:00] 0,9YrjjoALEfyZm07J7OuNvDVNyspIzrbOOAGnZtHx,true,1254409.598201082,1573543440061,1254409.5982018793
7. [2019-11-12 15:24:23] 0,mWsFgFlUnXKQQR6RpbAYDF9OhGYgU8mljvGCtZ26,true,1254409.6073950487,1573543463126,1254409.607395447
8. [2019-11-12 15:26:51] 0,5pZRRzkW3WDLdYLOklNgTLFX0Q0uywZ8jhw7RYfI,true,1254409.666525653,1573543611475,1254409.6665264503
9. [2019-11-12 15:29:11] 0,hVgGQrXpBtTJm6sovVK4YGjfNMdQ3z9pQHxD5Iqd,true,1254409.7222845491,1573543751364,1254409.7222853464
10. [2019-11-12 15:29:52] 0,7wQOQmxoaEl6Cxl1OSo6cr8MAc1AdJWJQaTPT5xs,true,1254409.7387664048,1573543792714,1254409.738767202
11. [2019-11-12 15:30:30] 0,a3Th5Q6a8Vy2h1zfWLEP7MdPhbKyTY3a4AfcOJs2,true,1254409.7538966285,1573543830673,1254409.7538974257
12. [2019-11-12 15:34:54] 0,d0yQAugqJ8M8OtmVQYMTYR8hi3uuX5WsH9VQRBpP,true,1254409.8589555968,1573544094247,1254409.8589563938
Schema of a dataHub topic
The following table describes the schema of the DataHub topic that corresponds to the preceding data.
Field name | Data type |
---|---|
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
Plug-in configuration file
Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide. Note: Data may be lost when an exec source is used because the exec source cannot ensure that the source event is submitted to the Flume channel. For example, when you run the tail command to collect specific data, and the Flume channel is full, the collected data will be lost. We recommend that you use a spooling directory source or a taildir source. Case 3 describes a taildir source. The static file test.csv in the /temp/ directory is used as the data source in this case. If log data is dynamically written to the file, you can run the tail -F logFile
command to collect log data in real time.
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.csv
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = REGEX
a1.sinks.k1.serializer.regex = \\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\] (\\d+),(\\S+),([a-z]+),([-+]?[0-9]*\\.?[0-9]*),(\\d+),([-+]?[0-9]*\\.?[0-9]*)
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Start the plug-in
The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:
$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
Case 3: Flume taildir source
As mentioned in the preceding parts, when the DataHub plug-in for Flume uses an exec source, data may be lost. Therefore, we recommend that you do not use exec sources in actual production environments. To collect local logs, you can use a taildir source or a spooling directory source. This case shows you how to use a taildir source to collect log data. If you use a taildir source, you can specify a file group. The taildir source observes the specified files and reads the new rows that are appended to the files in near real time. If new rows are being written, the taildir source retries reading the rows until the write operation is complete. The taildir source stores the last read position of each file in the positionFile file in the JSON format. The recorded read positions are not updated if the source event fails to be submitted to the Flume channel. Therefore, taildir sources are reliable.
Sample data
All log data is appended to the source file in the following format. The log file is named in the *.log format.
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
Schema of a DataHub topic
The following table describes the schema of the DataHub topic that corresponds to the preceding data.
Field name | Data type |
---|---|
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
Plug-in configuration file
Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file.
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /temp/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /temp/.*log
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Start the plug-in
The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:
1. $ cd ${FLUME_HOME}
2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
Case 4: JSON serializer
If you select JSON as the data parsing method, each row is regarded as a record in the form of a single-level JSON array. The nested content of an element at the first level is regarded as a string. If a field at the first level is in the mappings specified by the serializer.fieldnames
parameter, the field is mapped to a field in the destination DataHub topic. This case shows you how the DataHub plug-in for Flume uses the JSON parsing method to write a log file to DataHub in near real time.
Sample data
Save the following data as the test.json file in the local directory /temp/. The data to be synchronized is the content after the dates.
{"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
null
null
null
null
null
null
null
null
{"gender":true,"name":{"a":"OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ"},"id":8,"salary":1254275.1144637289,"decimal":1254275.1144637289}
Schema of a DataHub topic
The following table describes the schema of the DataHub topic that corresponds to the preceding data.
Field name | Data type |
---|---|
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
Plug-in configuration file
Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide.
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.json
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = JSON
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Start the plug-in
The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the JSON file to DataHub:
$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
Use cases of sources
Case 1
A DataHub source can read data from DataHub and then write the data to another system in a reliable manner. This case shows you how to use this source type to export data to the destination console.
Schema of a DataHub topic
The following table describes the corresponding schema of the DataHub topic.
Field name | Data type |
---|---|
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
Plug-in configuration file
Create a file named datahub_source.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file:
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.aliyun.datahub.flume.sink.DatahubSource
a1.sources.r1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sources.r1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sources.r1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sources.r1.datahub.project = datahub_test
a1.sources.r1.datahub.topic = test_flume
a1.sources.r1.datahub.subId = {YOUR_ALIYUN_DATAHUB_SUB_ID}
a1.sources.r1.serializer = DELIMITED
a1.sources.r1.serializer.delimiter = ,
a1.sources.r1.serializer.charset = UTF-8
a1.sources.r1.datahub.retryTimes = 3
a1.sources.r1.datahub.batchSize = 1000
a1.sources.r1.datahub.batchTimeout = 5
a1.sources.r1.datahub.enablePb = false
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Start the plug-in
$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_source.conf -Dflume.root.logger=INFO,console
Flume metric
The DataHub plug-in for Flume supports the built-in counting metrics of Flume. You can use the metrics to monitor the operations of the plug-in. Different metrics are supported for the sink and source of the DataHub plug-in for Flume. The following table describes DataHub-related metrics. For more information about other metrics, see Available Component Metrics. 1.)DatahubSink
Metric | Description |
---|---|
BatchEmptyCount | The number of times for which no data needs to be written to DataHub upon the expiration of the time to wait before data is written to DataHub. The time to wait takes effect only when the data amount does not reach the threshold specified by the batchSize parameter. |
BatchCompleteCount | The number of successful write operations where all the requested records are written to DataHub. |
EventDrainAttemptCount | The number of parsed records that the plug-in attempts to write to DataHub. |
BatchUnderflowCount | The number of times for which the amount of data written to DataHub is less than the amount of data to be written. In such a scenario, the data is parsed, but some or all of the data fails to be written to DataHub. |
EventDrainSuccessCount | The number of records that are written to DataHub. |
2.)DatahubSource
Metric | Description |
---|---|
EventReceivedCount | The number of DataHub records received by the source. |
EventAcceptedCount | The number of DataHub records submitted to the channel by the source. |
Enable monitoring
Apache Flume provides various monitoring methods. This part describes how to enable HTTP monitoring. For more information about other monitoring methods, see Monitoring. To enable HTTP monitoring, add the following two parameter configurations when you start the DataHub plug-in for Flume: -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
. A value of http for the type parameter indicates HTTP monitoring. The port parameter specifies the port number. The following code provides an example on how to start the plug-in:
bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
After the plug-in is started, you can log on to the web page to view the metrics. The URL is https://ip:1234/metrics.
FAQ
Q: What can I do if the "org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight" error is reported when I start the DataHub plug-in for Flume?
A: The default heap memory of the DataHub plug-in for Flume is 20 MB. If the specified number of records to be written at a time is great, the heap memory used by the DataHub plug-in for Flume exceeds 20 MB.
Solution 1: Reduce the value of the batchSize parameter.
Solution 2: Increase the maximum heap memory of the DataHub plug-in for Flume.
$ vim bin/flume-ng
JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"
Q: Does the DataHub plug-in for Flume support the JSON format?
A: No. However, you can use custom regular expressions to parse data, or modify the code of the DataHub plug-in for Flume and add JSONEvent to support the JSON format.
Q: Does the DataHub plug-in for Flume support topics whose data type is BLOB?
A: No. The DataHub plug-in for Flume supports only topics whose data type is TUPLE.
Q: Why does the DataHub plug-in for Flume report the "org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count" error?
A: The Flume channel is full, and the source fails to write data to the Flume channel. You can modify the channel capacity in the configuration file or appropriately reduce the value of the batchSize parameter to resolve this issue.
Q: What can I do in the following case: An error occurs when I use an earlier version of Apache Flume. The DataHub plug-in for Flume fails to be started due to the conflict of JAR packages.
For example, when Apache Flume V1.6 is used, the java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;
error is reported. The plug-in of a later version and Apache Flume V1.6 depend on different versions of the JAR package. Apache Flume V1.6 depends on an earlier version of the JAR package, and thus the methods provided by the later version cannot be found.
A: Delete the following three JAR packages in the ${FLUME_HOME}/lib directory:
jackson-annotations-2.3.0.jar
jackson-databind-2.3.1.jar
jackson-annotations-2.3.0.jar
Q: What can I do if empty strings are automatically converted to NULL when I use the DataHub plug-in for Flume to collect data?
A: In the DataHub plug-in for Flume V2.0.2, the trim() method is used for non-empty strings, and empty strings are directly converted to NULL. This logic is removed in the DataHub plug-in for Flume V2.0.3. Empty strings are retained after they are written to DataHub rather than converted to NULL.
Download earlier versions
Plug-in version | Apache Flume version (recommended) |
---|---|
1.9 | |
1.9 | |
1.9 |
2.0.3
The IDs of shards can be specified for data transfer.
Data types such as DECIMAL and TIMESTAMP in DataHub are supported.
Specific log information is fixed to facilitate troubleshooting.
DataHub sources are added so that the DataHub plug-in for Flume can be used to read data from DataHub.
Features such as data compression for transmission and binary transmission are provided.
The issue is fixed where an empty string is converted to NULL.
2.0.4
The issue that occurs when the value of fieldnames contains an empty field is fixed .
2.0.8
Data types including TINYINT, SMALLINT, INTEGER, and FLOAT in DataHub are supported.