All Products
Search
Document Center

DataHub:DataHub plug-in for Flume

Last Updated:Jan 24, 2025

The DataHub plug-in for Flume is a DataHub plug-in for data subscription and publishing. This plug-in is developed based on Apache Flume. This plug-in can write the collected data to DataHub and read data from DataHub and then write the data to other systems. This plug-in complies with the development conventions of Apache Flume plug-ins and is easy to install. You can use this plug-in to publish data to DataHub and subscribe to the data within DataHub.

Install the DataHub plug-in for Flume

Limits for installation

  • The version of Java Development Kit (JDK) must be 1.8 or later.

  • The version of Apache Maven must be 3.X.

  • The version of Flume-NG must be 1.X.

Install the DataHub plug-in for Flume

  1. Download Apache Flume. Skip this step if Apache Flume is downloaded.

    $ tar zxvf apache-flume-1.11.0-bin.tar.gz
    Note

    For ease of illustration, ${FLUME_HOME} is used in the following information to specify the home directory of Apache Flume.

  2. Install the DataHub plug-in for Flume.

    • Directly install the DataHub plug-in for Flume.

      1. Download the DataHub plug-in for Flume .

      2. Extract the DataHub plug-in for Flume from the package and save the plug-in in the ${FLUME_HOME}/plugins.d directory.

        $ tar aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ cd aliyun-flume-datahub-sink-x.x.x
        $ mkdir ${FLUME_HOME}/plugins.d
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
    • Use source code to install the DataHub plug-in for Flume.

      1. Download the source code from aliyun-maxcompute-data-collectors.

      2. Compile the source code to install the plug-in.

        $ cd aliyun-maxcompute-data-collectors
        $ mvn clean package -DskipTests=true  -Dmaven.javadoc.skip=true
        $ cd flume-plugin/target
        $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d

Parameters

Sink-related parameters

Parameter

Default value

Required

Description

datahub.endPoint

-

Yes

The endpoint of DataHub.

datahub.accessId

-

Yes

The AccessKey ID of your Alibaba Cloud account.

datahub.accessKey

-

Yes

The AccessKey secret of your Alibaba Cloud account.

datahub.project

-

Yes

The name of the DataHub project.

datahub.topic

-

Yes

The name of the DataHub topic.

datahub.shard.ids

IDs of all shards

No

The IDs of the shards in DataHub to which data is to be written. Separate multiple IDs with commas (,), such as 0,1,2. Each time a shard is randomly selected from the specified shard list, data is written to the shard in DataHub. If this parameter is not set and shards are merged or split, the plug-in automatically adjusts the shard list. You can also modify the configuration file to make adjustments to the shard list.

datahub.enablePb

true

No

Specifies whether to enable Protobuf for data transfer. If Protobuf is not supported when you use Apsara Stack, you must set this parameter to false.

datahub.compressType

none

No

Specifies whether to enable compression for data transfer. The LZ4 and DEFLATE compression formats are supported.

datahub.batchSize

1000

No

The maximum number of records that can be written to DataHub at a time.

datahub.maxBufferSize

2*1024*1024

No

The maximum amount of data that can be written to DataHub at a time. Unit: bytes. We recommend that you do not change this parameter to a larger value. If the amount of data to be written at a time is large, the write operation may fail.

datahub.batchTimeout

5

No

The time to wait before data is read from DataHub. Unit: seconds. This parameter takes effect only when the data amount does not reach the threshold specified by the batchSize parameter.

datahub.retryTimes

3

No

The maximum number of retries allowed after a write failure.

datahub.retryInterval

5

No

The interval between two consecutive retries after a data write failure. Unit: seconds.

datahub.dirtyDataContinue

true

No

Specifies whether to ignore dirty records. If you set this parameter to true, a delimiter, which is a comma (,), is added and then a dirty record is automatically written after the comma to the file that stores dirty records. This action does not affect subsequent data processing.

datahub.dirtyDataFile

DataHub-Flume-dirty-file

No

The name of the file that stores dirty records.

serializer

-

Yes

The data parsing method. Valid values: DELIMITED, JSON, and REGEX. If DELIMITED is selected, the data is parsed based on the specified delimiter. If JSON is selected, each row is parsed as a single-level JSON array. If REGEX is selected, the data is parsed based on the specified regular expression.

serializer.delimiter

,

No

The delimiter that is used to separate fields. If you want to use special characters, you must enclose the characters in double quotation marks (" "), such as "\t".

serializer.regex

(.*)

No

The regular expression used for data parsing. The values of each field are parsed into a group.

serializer.fieldnames

-

Yes

The mappings between the input fields and DataHub fields. The input fields are marked based on the input order. If you do not want to map an input field, leave the field name empty. For example, c1,c2,,c4 indicates that the first, second, and fourth input fields are mapped to the c1,c2, and c3 fields in DataHub.

serializer.charset

UTF-8

No

The encoding format for data parsing.

Source-related parameters

Parameter

Default value

Required

Description

datahub.endPoint

-

Yes

The endpoint of DataHub.

datahub.accessId

-

Yes

The AccessKey ID of your Alibaba Cloud account.

datahub.accessKey

-

Yes

The AccessKey secret of your Alibaba Cloud account.

datahub.project

-

Yes

The name of the DataHub project.

datahub.topic

-

Yes

The name of the DataHub topic.

datahub.subId

-

Yes

The ID of the subscription to DataHub.

datahub.startTime

-

No

The point in time from which data is to be read from DataHub. Specify the time in the format of yyyy-MM-dd HH:mm:ss. If you set this parameter, the subscription offset is reset, and then data is read based on the subscription.

datahub.shard.ids

-

No

The IDs of the shards in DataHub from which data is to be read. Separate multiple IDs with commas (,), such as 0,1,2. Each time a shard is randomly selected from the specified shard list, data is read from the shard in DataHub. If this parameter is left empty, collaborative consumption is enabled for data read operations. We recommend that you do not set this parameter. If you configure multiple sources without setting this parameter, collaborative consumption is enabled to automatically allocate shards. This ensures that each source has a balanced load.

datahub.enablePb

true

No

Specifies whether to enable Protobuf for data transfer. If Protobuf is not supported when you use Apsara Stack, you must set this parameter to false.

datahub.compressType

none

No

Specifies whether to enable compression for data transfer. The LZ4 and DEFLATE compression formats are supported.

datahub.batchSize

1000

No

The maximum number of records that can be read from DataHub at a time.

datahub.batchTimeout

5

No

The time to wait before data is read from DataHub. Unit: seconds. This parameter takes effect only when the data amount does not reach the threshold specified by the batchSize parameter.

datahub.retryTimes

3

No

The maximum number of retries allowed after a read failure. By default, the interval between two consecutive retries is 1 second and cannot be changed.

datahub.autoCommit

true

No

Specifies whether the consumer automatically submits the consumption offset. If you set this parameter to true, the consumer automatically submits the consumption offset. In this case, the data may not be consumed, but the consumption offset is submitted. If you set this parameter to false, the consumption offset is submitted after the data is submitted to the Flume channel.

datahub.offsetCommitTimeout

30

No

The interval at which the consumer automatically submits the consumption offset. Unit: seconds.

datahub.sessionTimeout

60

No

The session timeout period. Sources work in collaborative consumption mode. If collaborative consumption times out and no heartbeat message is sent, the session is automatically canceled.

serializer

-

Yes

The data parsing method. Set the value to DELIMITED. The fields within a row are written in the order specified by the DataHub schema and are separated by the specified delimiter.

serializer.delimiter

,

No

The delimiter that is used to separate fields. If you want to use special characters, you must enclose the characters in double quotation marks (" "), such as "\t".

serializer.charset

UTF-8

No

The encoding format for data parsing.

Case description

Sink use cases

Case 1: DELIMITED serializer

  1. Prepare test data.

    If you select DELIMITED as the data parsing method, each row is regarded as a record and the data is parsed based on the specified delimiter. This case shows how to use the DataHub plug-in for Flume to write a CSV file to DataHub in near real time. Save the following data as a file named test.csv in the local directory /temp/:

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
    2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
    3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
    4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
    5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
    6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
    7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
    8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289

    The following table describes the schema of the DataHub topic that corresponds to the preceding data.

    Field name

    Data type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure Apache Flume files.

    Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide.

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    Note

    Data may be lost when an exec source is used because the exec source cannot ensure that the source event is submitted to the Flume channel. For example, when you run the tail command to collect specific data and the Flume channel is full, the collected data will be lost. We recommend that you use Spooling Directory Source or Taildir Source. The static file test.csv in the /temp/ directory is used as the data source in this case. If log data is dynamically written to the file, you can run the tail -F logFile command to collect log data in real time.

  3. Start the DataHub plug-in for Flume.

    The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Case 2: REGEX serializer

  1. Prepare test data.

    If you select REGEX as the data parsing method, each row is regarded as a record and the data is parsed based on the specified regular expression. The content of a record is expressed in multiple groups. This case shows you how the DataHub plug-in for Flume uses a regular expression to write a CSV file to DataHub in near real time. Save the following data as a file named test.csv in the local directory /temp/.

    1. [2019-11-12 15:20:08] 0,j4M6PhzL1DXVTQawdfk306N2KnCDxtR0KK1pke5O,true,1254409.5059812006,1573543208698,1254409.5059819978
    2. [2019-11-12 15:22:35] 0,mYLF8UzIYCCFUm1jYs9wzd2Hl6IMr2N7GPYXZSZy,true,1254409.5645912462,1573543355740,1254409.5645920434
    3. [2019-11-12 15:23:14] 0,MOemUZur37n4SGtdUQyMohgmM6cxZRBXjJ34HzqX,true,1254409.5799291395,1573543394219,1254409.579929538
    4. [2019-11-12 15:23:30] 0,EAFc1VTOvC9rYzPl9zJYa6cc8uJ089EaFd79B25i,true,1254409.5862723626,1573543410134,1254409.5862731598
    5. [2019-11-12 15:23:53] 0,zndVraA4GP7FP8p4CkQFsKJkxwtYK3zXjDdkhmRk,true,1254409.5956010541,1573543433538,1254409.5956018514
    6. [2019-11-12 15:24:00] 0,9YrjjoALEfyZm07J7OuNvDVNyspIzrbOOAGnZtHx,true,1254409.598201082,1573543440061,1254409.5982018793
    7. [2019-11-12 15:24:23] 0,mWsFgFlUnXKQQR6RpbAYDF9OhGYgU8mljvGCtZ26,true,1254409.6073950487,1573543463126,1254409.607395447
    8. [2019-11-12 15:26:51] 0,5pZRRzkW3WDLdYLOklNgTLFX0Q0uywZ8jhw7RYfI,true,1254409.666525653,1573543611475,1254409.6665264503
    9. [2019-11-12 15:29:11] 0,hVgGQrXpBtTJm6sovVK4YGjfNMdQ3z9pQHxD5Iqd,true,1254409.7222845491,1573543751364,1254409.7222853464
    10. [2019-11-12 15:29:52] 0,7wQOQmxoaEl6Cxl1OSo6cr8MAc1AdJWJQaTPT5xs,true,1254409.7387664048,1573543792714,1254409.738767202
    11. [2019-11-12 15:30:30] 0,a3Th5Q6a8Vy2h1zfWLEP7MdPhbKyTY3a4AfcOJs2,true,1254409.7538966285,1573543830673,1254409.7538974257
    12. [2019-11-12 15:34:54] 0,d0yQAugqJ8M8OtmVQYMTYR8hi3uuX5WsH9VQRBpP,true,1254409.8589555968,1573544094247,1254409.8589563938

    The following table describes the schema of the DataHub topic that corresponds to the preceding data.

    Field name

    Data type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure Apache Flume files.

    Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide.

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = REGEX
    a1.sinks.k1.serializer.regex = \\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\] (\\d+),(\\S+),([a-z]+),([-+]?[0-9]*\\.?[0-9]*),(\\d+),([-+]?[0-9]*\\.?[0-9]*)
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    Note

    Data may be lost when an exec source is used because the exec source cannot ensure that the source event is submitted to the Flume channel. For example, when you run the tail command to collect specific data and the Flume channel is full, the collected data will be lost. We recommend that you use Spooling Directory Source or Taildir Source. The static file test.csv in the /temp/ directory is used as the data source in this case. If log data is dynamically written to the file, you can run the tail -F logFile command to collect log data in real time.

  3. Start the DataHub plug-in for Flume.

    The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Case 3: Flume taildir source

As mentioned in the preceding parts, when the DataHub plug-in for Flume uses an exec source, data may be lost. Therefore, we recommend that you do not use exec sources in actual production environments. To collect local logs, you can use Taildir Source or Spooling Directory Source. This case shows you how to use a taildir source to collect log data. The taildir source observes the specified files and reads the new rows that are added to the files in near real time. If new rows are being written, the taildir source retries reading the rows until the write operation is complete. The taildir source stores the last read position of each file in the positionFile file in the JSON format. The recorded read positions are not updated if the source event fails to be submitted to the Flume channel. Therefore, taildir sources are reliable.

  1. Prepare test data.

    Add all log data to the end of the log file in the following format. The log file is named in the *.log format.

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289

    The following table describes the schema of the DataHub topic that corresponds to the preceding data.

    Field name

    Data type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure Apache Flume files.

    Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file.

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /temp/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /temp/.*log
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. Start the DataHub plug-in for Flume.

    The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:

    1. $ cd ${FLUME_HOME}
    2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Case 4: JSON serializer

If you select JSON as the data parsing method, each row is regarded as a record in the form of a single-level JSON array. The nested content of an element at the first level is regarded as a string. If the value of the name field at the first level is in the mapping specified by the serializer.fieldnames parameter, the value of the name field is mapped to the field in the destination DataHub topic. This case shows you how the DataHub plug-in for Flume uses the JSON parsing method to write a log file to DataHub in near real time.

  1. Prepare test data.

    Save the following data as the test.json file in the local directory /temp/. The data to be synchronized is the content after the dates.

    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V","id":1,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l","id":2,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs","id":3,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI","id":4,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV","id":5,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX","id":6,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8","id":7,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"gender":true,"name":{"a":"OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ"},"id":8,"salary":1254275.1144637289,"decimal":1254275.1144637289}

    The following table describes the schema of the DataHub topic that corresponds to the preceding data.

    Field name

    Data type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure Apache Flume files.

    Create a file named datahub_basic.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file. This case uses an exec source. For more information about other sources, see Flume 1.9.0 User Guide.

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.json
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = JSON
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. Start the DataHub plug-in for Flume.

    The Dflume.root.logger=INFO,console option can be used to export logs to the DataHub console in real time. If you require more information, you can use the debugging mode. Run the following commands to start the DataHub plug-in for Flume to write the CSV file to DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Source use cases

Read data from DataHub to other systems

DataHub-Flume Source can read data from DataHub and then write the data to another system in a reliable manner. This case shows you how to use DataHub-Flume Source to export log data to the destination console.

  1. The following table describes the schema of the DataHub topic that corresponds to the preceding data.

    Field name

    Data type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure Apache Flume files.

    Create a file named datahub_source.conf in the ${FLUME_HOME}/conf directory, and enter the following content in the file:

     # A single-node Flume configuration for DataHub
     # Name the components on this agent
     a1.sources = r1
     a1.sinks = k1
     a1.channels = c1
    
     # Describe/configure the source
     a1.sources.r1.type = com.aliyun.datahub.flume.sink.DatahubSource
     a1.sources.r1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
     a1.sources.r1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
     a1.sources.r1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
     a1.sources.r1.datahub.project = datahub_test
     a1.sources.r1.datahub.topic = test_flume
     a1.sources.r1.datahub.subId = {YOUR_ALIYUN_DATAHUB_SUB_ID}
     a1.sources.r1.serializer = DELIMITED
     a1.sources.r1.serializer.delimiter = ,
     a1.sources.r1.serializer.charset = UTF-8
     a1.sources.r1.datahub.retryTimes = 3
     a1.sources.r1.datahub.batchSize = 1000
     a1.sources.r1.datahub.batchTimeout = 5
     a1.sources.r1.datahub.enablePb = false
    
     # Describe the sink
     a1.sinks.k1.type = logger
    
     # Use a channel which buffers events in memory
     a1.channels.c1.type = memory
     a1.channels.c1.capacity = 10000
     a1.channels.c1.transactionCapacity = 10000
    
     # Bind the source and sink to the channel
     a1.sources.r1.channels = c1
     a1.sinks.k1.channel = c1
  3. Start the DataHub plug-in for Flume.

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_source.conf -Dflume.root.logger=INFO,console

Flume metric

The DataHub plug-in for Flume supports the built-in counting metrics of Flume. You can monitor the operations of the plug-in based on the metrics. Different metrics are supported for the sinks and sources of the DataHub plug-in for Flume. The following table describes DataHub related metrics. For more information about other metrics, see Available Component Metrics.

DatahubSink

Metric

Description

BatchEmptyCount

The number of times for which no data needs to be written to DataHub upon the expiration of the time to wait before data is written to DataHub. The time to wait takes effect only when the data amount does not reach the threshold specified by the batchSize parameter.

BatchCompleteCount

The number of successful write operations where all the requested records are written to DataHub.

EventDrainAttemptCount

The number of parsed records that the plug-in attempts to write to DataHub.

BatchUnderflowCount

The number of times for which the amount of data is written to DataHub is less than the amount of data to be written. In such a scenario, the data is parsed, but some or all of the data fails to be written to DataHub.

EventDrainSuccessCount

The number of records that are written to DataHub.

DatahubSource

Metric

Description

EventReceivedCount

The number of DataHub records received by the source.

EventAcceptedCount

The number of DataHub records submitted to the channel by the source.

Flume monitoring

Apache Flume provides various monitoring methods. This part describes how to enable HTTP monitoring. For more information about other monitoring methods, see Monitoring. To enable HTTP monitoring, add the following two parameters when you start the DataHub plug-in for Flume: Dflume.monitoring.type=http and Dflume.monitoring.port=1234. The value http for the Dflume.monitoring.type parameter indicates HTTP monitoring, and the value 1234 for the Dflume.monitoring.port parameter indicates the port number. The following code provides an example on how to start the plug-in:

bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234

After the plug-in is started, you can log on to the web page to view the metrics. The URL is https://ip:1234/metrics.

Note

For more information about other monitoring methods, see Flume 1.9.0 User Guide.

FAQ

What can I do if the "org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight" error is reported when I start the DataHub plug-in for Flume?

The default heap memory of the DataHub plug-in for Flume is 20 MB. If the specified number of records to be written at a time is great, the heap memory used by the DataHub plug-in for Flume exceeds 20 MB. You can use one of the following solutions to resolve the issue:

Solution 1: Reduce the value of the batchSize parameter.

Solution 2: Increase the maximum heap memory of the DataHub plug-in for Flume.

  • $ vim bin/flume-ng

  • JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"

Does the DataHub plug-in for Flume support the JSON format?

No. However, you can use custom regular expressions to parse data, or modify the code of the DataHub plug-in for Flume and add JSONEvent to support the JSON format.

Does the DataHub plug-in for Flume support topics whose data type is BLOB?

No. The DataHub plug-in for Flume supports only topics whose data type is TUPLE.

Why does the DataHub plug-in for Flume report the "org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count" error?

The Flume channel is full, and the source fails to write data to the Flume channel. You can modify the channel capacity in the configuration file and appropriately reduce the value of the batchSize parameter to resolve this issue.

What can I do in the following case: An error occurs when I use an earlier version of Apache Flume. The DataHub plug-in for Flume fails to be started due to the conflict of JAR packages.

  • For example, when Apache Flume V1.6 is used, the java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader; error is reported. The plug-in of a later version and Apache Flume V1.6 depend on different versions of the JAR package. Apache Flume V1.6 depends on an earlier version of the JAR package, and thus the methods provided by the plug-in of a later version cannot be found.

  • Delete the following three JAR packages in the ${FLUME_HOME}/lib directory:

    • jackson-annotations-2.3.0.jar

    • jackson-databind-2.3.1.jar

    • jackson-annotations-2.3.0.jar

What can I do if empty strings are automatically converted to NULL when I use the DataHub plug-in for Flume to collect data?

In the DataHub plug-in for Flume V2.0.2, the trim() method is used for non-empty strings, and empty strings are directly converted to NULL. This logic is removed in the DataHub plug-in for Flume V2.0.3. Empty strings are retained rather than converted to NULL after they are written to DataHub.

Cannot invoke "com.google.common.cache. LoadingCache.get(Object)" because "com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache" is null].

Delete the guava.jar and zstd.jar files in the lib directory of Flume.