All Products
Search
Document Center

DataHub:DataHub plug-in for Flume

Last Updated:Aug 19, 2022

DataHub plug-in for Flume

Overview

The DataHub plug-in for Flume is a DataHub plug-in for data subscription and publishing. It is developed based on Apache Flume. The plug-in can write the collected data to DataHub. The plug-in can also read data from DataHub and then write the data to other systems. The plug-in complies with the development conventions of Apache Flume plug-ins and is easy to install. You can use the plug-in to publish data to DataHub and subscribe to the data in DataHub.

Install Apache Flume and the DataHub plug-in for Flume

Environment requirements

  1. Java Development Kit (JDK) V1.8 or later

  2. Apache Maven 3.x

  3. Flume-NG 1.x

    Download Apache Flume

    Download Apache Flume. Skip this step if you have downloaded Apache Flume.

$ tar zxvf apache-flume-1.9.0-bin.tar.gz

For convenience, ${FLUME_HOME} is used to indicate the home directory of Apache Flume in the following description.

Install the DataHub plug-in for Flume

The plug-in must be installed in Linux.

Use the installation package

  1. Download the latest package of the DataHub plug-in for Flume.To download the earlier versions of the plug-in, see the "Download earlier versions" section of this topic.

  2. Extract the DataHub plug-in for Flume from the package and place the plug-in in the ${FLUME_HOME}/plugins.d directory.

    $ tar aliyun-flume-datahub-sink-x.x.x.tar.gz
    $ cd aliyun-flume-datahub-sink-x.x.x
    $ mkdir ${FLUME_HOME}/plugins.d
    $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
  3. Use the source code

  4. Download the source code of the DataHub plug-in for Flume from GitHub.

  5. Compile the code to install the plug-in.

    $ cd aliyun-maxcompute-data-collectors
    $ mvn clean package -DskipTests=true  -Dmaven.javadoc.skip=true
    $ cd flume-plugin/target
    $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz
    $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d

Parameters

Parameters in a sink

Parameter

Default value

Required

Description

datahub.endPoint

-

Yes

The endpoint of DataHub.

datahub.accessId

-

Yes

The AccessKey ID used to access DataHub.

datahub.accessKey

-

Yes

The AccessKey secret used to access DataHub.

datahub.project

-

Yes

The name of the DataHub project.

datahub.topic

-

Yes

The name of the DataHub topic.

datahub.shard.ids

IDs of all shards

No

The shards in DataHub to which data is to be written. Separate multiple IDs with commas (,), such as 0,1,2. Each time a shard is randomly selected from the specified shard list and then data is written to the shard in DataHub. If this parameter is not set and shards are merged or split, the plug-in automatically adjusts the shard list. You can also modify the configuration file to manually adjust the shard list.

datahub.enablePb

true

No

Specifies whether to enable Protocol Buffers (Protobuf) for data transfer. If Protobuf is not supported when you use Apsara Stack, you must set this parameter to false.

datahub.compressType

none

No

Specifies whether to enable compression for data transfer. The LZ4 and DEFLATE compression formats are supported.

datahub.batchSize

1000

No

The maximum number of records that can be written to DataHub at a time.

datahub.maxBufferSize

2*1024*1024

No

The maximum amount of data that can be written to DataHub at a time. Unit: byte. We recommend that you do not modify this parameter. If the amount of data to be written at a time is large, the write operation may fail.

datahub.batchTimeout

5

No

The time to wait before data is written to DataHub. Unit: seconds. This parameter takes effect only when the data amount does not reach the threshold specified by the batchSize parameter.

datahub.retryTimes

3

No

The maximum number of retries allowed after a write failure.

datahub.retryInterval

5

No

The interval between two consecutive retries after a data write failure. Unit: seconds.

datahub.dirtyDataContinue

true

No

Specifies whether to ignore dirty records. If you set this parameter to true, a dirty record is automatically written after the delimiter, a comma (,), to the file that stores dirty records. This does not affect subsequent data processing.

datahub.dirtyDataFile

DataHub-Flume-dirty-file

No

The name of the file that stores dirty records.

serializer

-

Yes

The data parsing method. Valid values: DELIMITED, JSON, and REGEX. If DELIMITED is selected, the data is parsed based on the specified delimiter. If JSON is selected, each row is parsed as a single-level JSON array. If REGEX is selected, the data is parsed based on the specified regular expression.

serializer.delimiter

,

No

The delimiter that is used to separate fields. If you want to use special characters, you must enclose the characters in double quotation marks (""), such as "\t".

serializer.regex

(.*)

No

The regular expression used for data parsing. The values of each field are parsed into a group.

serializer.fieldnames

-

Yes

The mappings between the input fields and DataHub fields. The input fields are marked based on the input order. If you want an input field not to be mapped, skip this field. For example, c1,c2,,c3 indicates that the first, second, and fourth input fields are mapped to the c1,c2, and c3 fields in DataHub.

serializer.charset

UTF-8

No

The encoding format for data parsing.

Parameters in a source

Parameter

Default value

Required

Description

datahub.endPoint

-

Yes

The endpoint of DataHub.

datahub.accessId

-

Yes

The AccessKey ID used to access DataHub.

datahub.accessKey

-

Yes

The AccessKey secret used to access DataHub.

datahub.project

-

Yes

The name of the DataHub project.

datahub.topic

-

Yes

The name of the DataHub topic.

datahub.subId

-

Yes

The ID of the subscription to DataHub.

datahub.startTime

-

No

The point in time from which data is to be read from DataHub. Specify the time in the format of yyyy-MM-dd HH:mm:ss. If you set this parameter, the subscription offset is reset, and then data is read based on the subscription.

datahub.shard.ids

-

No

The shards in DataHub from which data is to be read. Separate multiple IDs with commas (,). Example: 0,1,2. Each time a shard is randomly selected from the specified shard list, and then data is read from the shard in DataHub. If you do not set this parameter, collaborative consumption is enabled for data read operations. We recommend that you do not set this parameter. If multiple sources are configured, and you do not set this parameter, collaborative consumption is enabled to automatically allocate shards to ensure that each source has balanced load.

datahub.enablePb

true

No

Specifies whether to enable Protobuf for data transfer. If Protobuf is not supported when you use Apsara Stack, you must set this parameter to false.

datahub.compressType

none

No

Specifies whether to enable compression for data transfer. The LZ4 and DEFLATE compression formats are supported.

datahub.batchSize

1000

No

The maximum number of records that can be read from DataHub at a time.

datahub.batchTimeout

5

No

The time to wait before data is read from DataHub. Unit: seconds. This parameter takes effect only when the data amount does not reach the threshold specified by the batchSize parameter.

datahub.retryTimes

3

No

The maximum number of retries allowed after a read failure. By default, the interval between two consecutive retries is 1 second and cannot be changed.

datahub.autoCommit

true

No

Specifies whether the consumer automatically submits the consumption offset. If you set this parameter to true, the consumer automatically submits the consumption offset. In this case, the data may not be consumed but the consumption offset is submitted. If you set this parameter to false, the consumption offset is submitted after the data is submitted to the Flume channel.

datahub.offsetCommitTimeout

30

No

The intervals at which the consumer automatically submits the consumption offset. Unit: seconds.

datahub.sessionTimeout

60

No

The session timeout period. Sources work in collaborative consumption mode. If collaborative consumption times out and no heartbeat message is sent, the session is automatically disabled.

serializer

-

Yes

The data parsing method. Set the value to DELIMITED. The fields in a row are in the order specified by the DataHub schema and are separated by the specified delimiter.

serializer.delimiter

,

No

The delimiter that is used to separate fields. If you want to use special characters, you must enclose the characters in double quotation marks (""), such as "\t".

serializer.charset

UTF-8

No

The encoding format for data parsing.

Use cases of sinks

Case 1: DELIMITED serializer

If you select DELIMITED as the data parsing method, each row is regarded as a record, and the data is parsed based on the specified delimiter. This case shows you how to use the DataHub plug-in for Flume to write a CSV file that contains multiple records to DataHub in near real time.

Sample data

Save the following data as the test.csv file in the local directory /temp/:

0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289

Schema of a DataHub topic

The following table describes the schema of the DataHub topic that corresponds to the preceding data.

Field name

Data type

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

decimal

DECIMAL

Plug-in configuration file

Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide. Note: Data may be lost when an exec source is used because the exec source cannot ensure that the source event is submitted to the Flume channel. For example, when you run the tail command to collect specific data, and the Flume channel is full, the collected data will be lost. We recommend that you use a spooling directory source or a taildir source. Case 3 describes a taildir source. The static file test.csv in the /temp/ directory is used as the data source in this case. If log data is dynamically written to the file, you can run the tail -F logFile command to collect log data in real time.

# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.csv
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Start the plug-in

The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:

$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Case 2: REGEX serializer

If you select REGEX as the data parsing method, each row is regarded as a record, and the data is parsed based on the specified regular expression. The content of a record is expressed in multiple groups. This case shows you how the DataHub plug-in for Flume uses a regular expression to write a CSV file to DataHub in near real time.

Sample data

Save the following data as the test.csv file in the local directory /temp/. The data to be synchronized is the content after the dates.

1. [2019-11-12 15:20:08] 0,j4M6PhzL1DXVTQawdfk306N2KnCDxtR0KK1pke5O,true,1254409.5059812006,1573543208698,1254409.5059819978
2. [2019-11-12 15:22:35] 0,mYLF8UzIYCCFUm1jYs9wzd2Hl6IMr2N7GPYXZSZy,true,1254409.5645912462,1573543355740,1254409.5645920434
3. [2019-11-12 15:23:14] 0,MOemUZur37n4SGtdUQyMohgmM6cxZRBXjJ34HzqX,true,1254409.5799291395,1573543394219,1254409.579929538
4. [2019-11-12 15:23:30] 0,EAFc1VTOvC9rYzPl9zJYa6cc8uJ089EaFd79B25i,true,1254409.5862723626,1573543410134,1254409.5862731598
5. [2019-11-12 15:23:53] 0,zndVraA4GP7FP8p4CkQFsKJkxwtYK3zXjDdkhmRk,true,1254409.5956010541,1573543433538,1254409.5956018514
6. [2019-11-12 15:24:00] 0,9YrjjoALEfyZm07J7OuNvDVNyspIzrbOOAGnZtHx,true,1254409.598201082,1573543440061,1254409.5982018793
7. [2019-11-12 15:24:23] 0,mWsFgFlUnXKQQR6RpbAYDF9OhGYgU8mljvGCtZ26,true,1254409.6073950487,1573543463126,1254409.607395447
8. [2019-11-12 15:26:51] 0,5pZRRzkW3WDLdYLOklNgTLFX0Q0uywZ8jhw7RYfI,true,1254409.666525653,1573543611475,1254409.6665264503
9. [2019-11-12 15:29:11] 0,hVgGQrXpBtTJm6sovVK4YGjfNMdQ3z9pQHxD5Iqd,true,1254409.7222845491,1573543751364,1254409.7222853464
10. [2019-11-12 15:29:52] 0,7wQOQmxoaEl6Cxl1OSo6cr8MAc1AdJWJQaTPT5xs,true,1254409.7387664048,1573543792714,1254409.738767202
11. [2019-11-12 15:30:30] 0,a3Th5Q6a8Vy2h1zfWLEP7MdPhbKyTY3a4AfcOJs2,true,1254409.7538966285,1573543830673,1254409.7538974257
12. [2019-11-12 15:34:54] 0,d0yQAugqJ8M8OtmVQYMTYR8hi3uuX5WsH9VQRBpP,true,1254409.8589555968,1573544094247,1254409.8589563938

Schema of a dataHub topic

The following table describes the schema of the DataHub topic that corresponds to the preceding data.

Field name

Data type

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

decimal

DECIMAL

Plug-in configuration file

Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide. Note: Data may be lost when an exec source is used because the exec source cannot ensure that the source event is submitted to the Flume channel. For example, when you run the tail command to collect specific data, and the Flume channel is full, the collected data will be lost. We recommend that you use a spooling directory source or a taildir source. Case 3 describes a taildir source. The static file test.csv in the /temp/ directory is used as the data source in this case. If log data is dynamically written to the file, you can run the tail -F logFile command to collect log data in real time.

# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.csv
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = REGEX
a1.sinks.k1.serializer.regex = \\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\] (\\d+),(\\S+),([a-z]+),([-+]?[0-9]*\\.?[0-9]*),(\\d+),([-+]?[0-9]*\\.?[0-9]*)
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Start the plug-in

The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:

$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Case 3: Flume taildir source

As mentioned in the preceding parts, when the DataHub plug-in for Flume uses an exec source, data may be lost. Therefore, we recommend that you do not use exec sources in actual production environments. To collect local logs, you can use a taildir source or a spooling directory source. This case shows you how to use a taildir source to collect log data. If you use a taildir source, you can specify a file group. The taildir source observes the specified files and reads the new rows that are appended to the files in near real time. If new rows are being written, the taildir source retries reading the rows until the write operation is complete. The taildir source stores the last read position of each file in the positionFile file in the JSON format. The recorded read positions are not updated if the source event fails to be submitted to the Flume channel. Therefore, taildir sources are reliable.

Sample data

All log data is appended to the source file in the following format. The log file is named in the *.log format.

0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289

Schema of a DataHub topic

The following table describes the schema of the DataHub topic that corresponds to the preceding data.

Field name

Data type

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

decimal

DECIMAL

Plug-in configuration file

Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file.

# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /temp/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /temp/.*log
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Start the plug-in

The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:

1. $ cd ${FLUME_HOME}
2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Case 4: JSON serializer

If you select JSON as the data parsing method, each row is regarded as a record in the form of a single-level JSON array. The nested content of an element at the first level is regarded as a string. If a field at the first level is in the mappings specified by the serializer.fieldnames parameter, the field is mapped to a field in the destination DataHub topic. This case shows you how the DataHub plug-in for Flume uses the JSON parsing method to write a log file to DataHub in near real time.

Sample data

Save the following data as the test.json file in the local directory /temp/. The data to be synchronized is the content after the dates.

{"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
null
null
null
null
null
null
null
null
{"gender":true,"name":{"a":"OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ"},"id":8,"salary":1254275.1144637289,"decimal":1254275.1144637289}

Schema of a DataHub topic

The following table describes the schema of the DataHub topic that corresponds to the preceding data.

Field name

Data type

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

decimal

DECIMAL

Plug-in configuration file

Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide.

# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.json
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = JSON
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Start the plug-in

The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the JSON file to DataHub:

$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Use cases of sources

Case 1

A DataHub source can read data from DataHub and then write the data to another system in a reliable manner. This case shows you how to use this source type to export data to the destination console.

Schema of a DataHub topic

The following table describes the corresponding schema of the DataHub topic.

Field name

Data type

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

decimal

DECIMAL

Plug-in configuration file

Create a file named datahub_source.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file:

 # A single-node Flume configuration for DataHub
 # Name the components on this agent
 a1.sources = r1
 a1.sinks = k1
 a1.channels = c1

 # Describe/configure the source
 a1.sources.r1.type = com.aliyun.datahub.flume.sink.DatahubSource
 a1.sources.r1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
 a1.sources.r1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
 a1.sources.r1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
 a1.sources.r1.datahub.project = datahub_test
 a1.sources.r1.datahub.topic = test_flume
 a1.sources.r1.datahub.subId = {YOUR_ALIYUN_DATAHUB_SUB_ID}
 a1.sources.r1.serializer = DELIMITED
 a1.sources.r1.serializer.delimiter = ,
 a1.sources.r1.serializer.charset = UTF-8
 a1.sources.r1.datahub.retryTimes = 3
 a1.sources.r1.datahub.batchSize = 1000
 a1.sources.r1.datahub.batchTimeout = 5
 a1.sources.r1.datahub.enablePb = false

 # Describe the sink
 a1.sinks.k1.type = logger

 # Use a channel which buffers events in memory
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 10000
 a1.channels.c1.transactionCapacity = 10000

 # Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

Start the plug-in

$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_source.conf -Dflume.root.logger=INFO,console

Flume metric

The DataHub plug-in for Flume supports the built-in counting metrics of Flume. You can use the metrics to monitor the operations of the plug-in. Different metrics are supported for the sink and source of the DataHub plug-in for Flume. The following table describes DataHub-related metrics. For more information about other metrics, see Available Component Metrics. 1.)DatahubSink

Metric

Description

BatchEmptyCount

The number of times for which no data needs to be written to DataHub upon the expiration of the time to wait before data is written to DataHub. The time to wait takes effect only when the data amount does not reach the threshold specified by the batchSize parameter.

BatchCompleteCount

The number of successful write operations where all the requested records are written to DataHub.

EventDrainAttemptCount

The number of parsed records that the plug-in attempts to write to DataHub.

BatchUnderflowCount

The number of times for which the amount of data written to DataHub is less than the amount of data to be written. In such a scenario, the data is parsed, but some or all of the data fails to be written to DataHub.

EventDrainSuccessCount

The number of records that are written to DataHub.

2.)DatahubSource

Metric

Description

EventReceivedCount

The number of DataHub records received by the source.

EventAcceptedCount

The number of DataHub records submitted to the channel by the source.

Enable monitoring

Apache Flume provides various monitoring methods. This part describes how to enable HTTP monitoring. For more information about other monitoring methods, see Monitoring. To enable HTTP monitoring, add the following two parameter configurations when you start the DataHub plug-in for Flume: -Dflume.monitoring.type=http -Dflume.monitoring.port=1234. A value of http for the type parameter indicates HTTP monitoring. The port parameter specifies the port number. The following code provides an example on how to start the plug-in:

bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234

After the plug-in is started, you can log on to the web page to view the metrics. The URL is https://ip:1234/metrics.

FAQ

  • Q: What can I do if the "org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight" error is reported when I start the DataHub plug-in for Flume?

  • A: The default heap memory of the DataHub plug-in for Flume is 20 MB. If the specified number of records to be written at a time is great, the heap memory used by the DataHub plug-in for Flume exceeds 20 MB.

    1. Solution 1: Reduce the value of the batchSize parameter.

    2. Solution 2: Increase the maximum heap memory of the DataHub plug-in for Flume.

      1. $ vim bin/flume-ng

      2. JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"

  • Q: Does the DataHub plug-in for Flume support the JSON format?

  • A: No. However, you can use custom regular expressions to parse data, or modify the code of the DataHub plug-in for Flume and add JSONEvent to support the JSON format.

  • Q: Does the DataHub plug-in for Flume support topics whose data type is BLOB?

  • A: No. The DataHub plug-in for Flume supports only topics whose data type is TUPLE.

  • Q: Why does the DataHub plug-in for Flume report the "org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count" error?

  • A: The Flume channel is full, and the source fails to write data to the Flume channel. You can modify the channel capacity in the configuration file or appropriately reduce the value of the batchSize parameter to resolve this issue.

  • Q: What can I do in the following case: An error occurs when I use an earlier version of Apache Flume. The DataHub plug-in for Flume fails to be started due to the conflict of JAR packages.

For example, when Apache Flume V1.6 is used, the java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader; error is reported. The plug-in of a later version and Apache Flume V1.6 depend on different versions of the JAR package. Apache Flume V1.6 depends on an earlier version of the JAR package, and thus the methods provided by the later version cannot be found.

  • A: Delete the following three JAR packages in the ${FLUME_HOME}/lib directory:

    • jackson-annotations-2.3.0.jar

    • jackson-databind-2.3.1.jar

    • jackson-annotations-2.3.0.jar

  • Q: What can I do if empty strings are automatically converted to NULL when I use the DataHub plug-in for Flume to collect data?

  • A: In the DataHub plug-in for Flume V2.0.2, the trim() method is used for non-empty strings, and empty strings are directly converted to NULL. This logic is removed in the DataHub plug-in for Flume V2.0.3. Empty strings are retained after they are written to DataHub rather than converted to NULL.

Download earlier versions

Plug-in version

Apache Flume version (recommended)

2.0.3

1.9

2.0.4

1.9

2.0.8

1.9

2.0.3

  • The IDs of shards can be specified for data transfer.

  • Data types such as DECIMAL and TIMESTAMP in DataHub are supported.

  • Specific log information is fixed to facilitate troubleshooting.

  • DataHub sources are added so that the DataHub plug-in for Flume can be used to read data from DataHub.

  • Features such as data compression for transmission and binary transmission are provided.

  • The issue is fixed where an empty string is converted to NULL.

2.0.4

  • The issue that occurs when the value of fieldnames contains an empty field is fixed .

2.0.8

  • Data types including TINYINT, SMALLINT, INTEGER, and FLOAT in DataHub are supported.