All Products
Search
Document Center

DataHub:Flume plug-in

Last Updated:Mar 12, 2026

The Flume-DataHub plugin is a change tracking and publishing plugin for DataHub built on Flume. You can use it to write data to DataHub or read data from DataHub and write it to other systems. This plugin complies with Flume development standards, is easy to install, and lets you publish and subscribe to data in DataHub.

Install the Flume plugin

Installation restrictions

  • JDK 1.8 or later.

  • Apache Maven version 3.x.

  • Flume-NG version 1.x.

Install Flume

  1. Download Flume. You can skip this step if you have already downloaded Flume.

    $ tar zxvf apache-flume-1.11.0-bin.tar.gz
    Note

    In this document, ${FLUME_HOME} refers to the Flume home directory.

  2. Install Flume-DataHub.

    • Direct installation

      1. Download the Flume-DataHub plugin.

      2. Unzip the Flume plugin and move it to the ${FLUME_HOME}/plugins.d directory.

        $ tar aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ cd aliyun-flume-datahub-sink-x.x.x
        $ mkdir ${FLUME_HOME}/plugins.d
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
    • Install from source code.

      1. Download the source code from aliyun-maxcompute-data-collectors.

      2. Compile and install.

        $ cd aliyun-maxcompute-data-collectors
        $ mvn clean package -DskipTests=true  -Dmaven.javadoc.skip=true
        $ cd flume-plugin/target
        $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d

Parameter reference

Sink parameters

Name

Default value

Required

Description

datahub.endPoint

-

Required

The service endpoint of Alibaba Cloud DataHub.

datahub.accessId

-

Required

Your Alibaba Cloud AccessKey ID.

datahub.accessKey

-

Required

Alibaba Cloud AccessKey

datahub.project

-

Required

The name of the DataHub project.

datahub.topic

-

Required

The name of the DataHub topic.

datahub.shard.ids

All shards

Optional

A comma-separated list of specific shard IDs to write to DataHub, such as 0,1,2. Each time, a shard is randomly selected from the list to write data. If this parameter is not specified, Flume automatically adjusts the shard list after a shard split or merge. Otherwise, you must manually modify the configuration file.

datahub.enablePb

true

Optional

Specifies whether to use Protocol Buffers (PB) for data transmission. Some Apsara Stack environments do not support PB and require you to manually set this to false.

datahub.compressType

none

Optional

Specifies whether to compress data for transmission. Supported compression types are LZ4 and DEFLATE.

datahub.batchSize

1000

Optional

Maximum data volume per send to DataHub

datahub.maxBufferSize

2 × 1024 × 1024

Optional

The maximum size of data written in a single request, in bytes. Do not change this parameter. Writing too much data at once may cause the write operation to fail.

datahub.batchTimeout

5

Optional:

The waiting time in seconds before synchronizing data to DataHub if the number of records does not reach the batch size.

datahub.retryTimes

3

Optional

The number of retries if data synchronization fails.

datahub.retryInterval

5

Optional

The retry interval in seconds if data synchronization fails.

datahub.dirtyDataContinue

true

Optional

Specifies whether to continue processing when dirty data is encountered. If set to true, dirty data is automatically written to the dirty data file with a comma as the separator. This does not affect the processing of subsequent data.

datahub.dirtyDataFile

DataHub-Flume-dirty-file

This is optional.

The dirty data file.

serializer

-

Required

The data parsing method. Supported methods are DELIMITED (separator), JSON (each row is a single-layer JSON object), and REGEX (regular expression).

serializer.delimiter

,

Optional

The separator for data fields. To use a special character, enclose it in double quotation marks, such as "\t".

serializer.regex

(.*)

Optional

The regular expression for data parsing. The data for each field is parsed into a group.

serializer.fieldnames

-

Required

The mapping from input data fields to DataHub fields. Fields are identified by their input order. To skip a field, leave its column name empty. For example, `c1,c2,,c3` maps the first, second, and fourth fields of the input data to the `c1`, `c2`, and `c3` fields in DataHub.

serializer.charset

UTF-8

Optional

The encoding format for data parsing.

Source parameters

Name

Default value

Required

Description

datahub.endPoint

-

Required

The service endpoint of Alibaba Cloud DataHub.

datahub.accessId

-

Required

Alibaba Cloud AccessKey ID

datahub.accessKey

-

Required

Alibaba Cloud Access Key

datahub.project

-

Required

The name of the DataHub project.

datahub.topic

-

Required

The name of the DataHub topic.

datahub.subId

-

Required

The DataHub subscription ID.

datahub.startTime

-

Optional

Specifies a point in time to start reading data. The format is yyyy-MM-dd HH:mm:ss. Using this parameter first resets the subscription and then reads data based on the subscription.

datahub.shard.ids

-

Optional

A comma-separated list of specific shard IDs to read from DataHub, such as 0,1,2. Each time data is read, a shard is randomly selected from the list for consumption. If this parameter is not specified, coordinated consumption is used to read data. Do not use this parameter. If multiple sources are configured and this parameter is not specified, coordinated consumption automatically assigns shards to ensure load balancing across the sources.

datahub.enablePb

true

Optional

Specifies whether to use PB for data transmission. Some Apsara Stack environments do not support PB and require you to manually set this to false.

datahub.compressType

none

Optional

Specifies whether to compress data for transmission. Supported compression types are LZ4 and DEFLATE.

datahub.batchSize

1000

Optional

The maximum number of records to read from DataHub at a time.

datahub.batchTimeout

5

Optional

The waiting time in seconds before synchronizing data to DataHub if the number of records does not reach the batch size.

datahub.retryTimes

3

Optional

The number of retries if data reading fails. The retry interval is fixed at 1 second and cannot be adjusted.

datahub.autoCommit

true

Optional

If set to true, the consumer automatically commits offsets. This may cause offsets to be committed before the data is consumed. If set to false, the offset is committed only after the data is submitted to the Flume channel.

datahub.offsetCommitTimeout

30

Optional

The interval for autocommitting offsets, in seconds.

datahub.sessionTimeout

60

Optional

The source feature uses coordinated consumption. If no heartbeat is sent within the timeout period, the session automatically closes.

serializer

-

Required

The data parsing method. Currently, only DELIMITED (separator) is supported. Each field of the data is written as a row in the order of the DataHub schema, separated by the specified delimiter.

serializer.delimiter

,

Optional

The separator for data fields. To use a special character, enclose it in double quotation marks, such as "\t".

serializer.charset

UTF-8

Optional

The encoding format for data parsing.

Use case

Sink examples

Example 1: DELIMITED serializer

  1. Prepare test data.

    When you use the DELIMITED serializer, each line is treated as a record and parsed using the specified separator. The following example shows how to use Flume to upload batch CSV files to DataHub in near real-time. Save the following content to a local file named /temp/test.csv.

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
    2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
    3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
    4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
    5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
    6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
    7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
    8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289

    The corresponding DataHub schema for the test data is as follows:

    Field name

    Field type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure the Flume file.

    In the ${FLUME_HOME}/conf directory, create a file named datahub_basic.conf and add the following content to the file. This example uses Exec Source as the data source. For more information about other sources, see the official Flume documentation.

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    Note

    The ExecSource source may cause data loss because it does not guarantee that events are placed into the channel. For example, if the Flume channel is full when the `tail` command retrieves data, that data is lost. We recommend that you use Spooling Directory Source or Taildir Source. This example uses the static file /temp/test.csv as the data source. If the file is a dynamically written log file, you can use the tail -F logFile command to perform real-time collection.

  3. Start Flume.

    The `Dflume.root.logger=INFO,console` option outputs logs to the console in real time. To obtain more information, use the DEBUG mode. Run the following command to start Flume and ingest data from the CSV file to DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Example 2: REGEX serializer

  1. Prepare test data.

    When you use the REGEX serializer, each line is treated as a record and parsed using the specified regular expression. The different parts of a record are represented by groups in the expression. The following example shows how to use Flume and regular expressions to upload data from a log file to DataHub in near real-time. Save the following test data to a local file named /temp/test.csv:

    1. [2019-11-12 15:20:08] 0,j4M6PhzL1DXVTQawdfk306N2KnCDxtR0KK1pke5O,true,1254409.5059812006,1573543208698,1254409.5059819978
    2. [2019-11-12 15:22:35] 0,mYLF8UzIYCCFUm1jYs9wzd2Hl6IMr2N7GPYXZSZy,true,1254409.5645912462,1573543355740,1254409.5645920434
    3. [2019-11-12 15:23:14] 0,MOemUZur37n4SGtdUQyMohgmM6cxZRBXjJ34HzqX,true,1254409.5799291395,1573543394219,1254409.579929538
    4. [2019-11-12 15:23:30] 0,EAFc1VTOvC9rYzPl9zJYa6cc8uJ089EaFd79B25i,true,1254409.5862723626,1573543410134,1254409.5862731598
    5. [2019-11-12 15:23:53] 0,zndVraA4GP7FP8p4CkQFsKJkxwtYK3zXjDdkhmRk,true,1254409.5956010541,1573543433538,1254409.5956018514
    6. [2019-11-12 15:24:00] 0,9YrjjoALEfyZm07J7OuNvDVNyspIzrbOOAGnZtHx,true,1254409.598201082,1573543440061,1254409.5982018793
    7. [2019-11-12 15:24:23] 0,mWsFgFlUnXKQQR6RpbAYDF9OhGYgU8mljvGCtZ26,true,1254409.6073950487,1573543463126,1254409.607395447
    8. [2019-11-12 15:26:51] 0,5pZRRzkW3WDLdYLOklNgTLFX0Q0uywZ8jhw7RYfI,true,1254409.666525653,1573543611475,1254409.6665264503
    9. [2019-11-12 15:29:11] 0,hVgGQrXpBtTJm6sovVK4YGjfNMdQ3z9pQHxD5Iqd,true,1254409.7222845491,1573543751364,1254409.7222853464
    10. [2019-11-12 15:29:52] 0,7wQOQmxoaEl6Cxl1OSo6cr8MAc1AdJWJQaTPT5xs,true,1254409.7387664048,1573543792714,1254409.738767202
    11. [2019-11-12 15:30:30] 0,a3Th5Q6a8Vy2h1zfWLEP7MdPhbKyTY3a4AfcOJs2,true,1254409.7538966285,1573543830673,1254409.7538974257
    12. [2019-11-12 15:34:54] 0,d0yQAugqJ8M8OtmVQYMTYR8hi3uuX5WsH9VQRBpP,true,1254409.8589555968,1573544094247,1254409.8589563938

    The corresponding DataHub schema for the test data is as follows:

    Field name

    Field type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure the Flume file.

    In the ${FLUME_HOME}/conf directory, create a file named datahub_basic.conf and add the following content to the file. This example uses Exec Source as the data source. For more information about other sources, see the official Flume documentation.

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = REGEX
    a1.sinks.k1.serializer.regex = \\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\] (\\d+),(\\S+),([a-z]+),([-+]?[0-9]*\\.?[0-9]*),(\\d+),([-+]?[0-9]*\\.?[0-9]*)
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    Note

    The ExecSource source may cause data loss because it does not guarantee that events are placed into the channel. For example, if the Flume channel is full when the `tail` command retrieves data, that data is lost. We recommend that you use Spooling Directory Source or Taildir Source. This example uses the static file /temp/test.csv as the data source. If the file is a dynamically written log file, you can use the tail -F logFile command to perform real-time collection.

  3. Start Flume.

    The Dflume.root.logger=INFO,console option outputs logs to the console in real time. To obtain more information, use the DEBUG mode. Run the following command to start Flume and ingest data from the CSV file to DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Example 3: Flume Taildir Source

The `exec` source in Flume may cause data loss. Therefore, we do not recommend that you use it in production environments. To collect local logs, you can use Taildir Source or Spooling Directory Source. The following example shows how to use Taildir to collect log files. `Taildir Source` can monitor a specified group of files and read new lines in near real-time as they are appended to each file. If new lines are being written, this source retries reading them until the write operation is complete. `Taildir Source` stores the read position of each file in a JSON file named `positionFile`. If a source event fails to be placed in the channel, the read position is not updated. This makes `Taildir Source` reliable.

  1. Prepare test data.

    All logs are appended to the file in the following format. The log files use the *.log naming format.

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289

    The corresponding DataHub schema for the test data is as follows:

    Field name

    Field type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure the Flume configuration file.

    In the ${FLUME_HOME}/conf directory, create a file named datahub_basic.conf and add the following content to the file.

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /temp/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /temp/.*log
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. Start Flume.

    The Dflume.root.logger=INFO,console option outputs logs to the console in real time. To obtain more information, use the DEBUG mode. Run the following command to start Flume and ingest data from the CSV file to DataHub:

    1. $ cd ${FLUME_HOME}
    2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Example 4: JSON serializer

When you use the JSON serializer, each line is treated as a record. Only the first layer of the JSON object is parsed. Nested content is treated as a string. If a top-level name exists in the configured serializer.fieldnames, its value is added to the corresponding column. The following example shows how to use Flume and JSON parsing to upload data from a log file to DataHub in near real-time.

  1. Prepare test data.

    Save the following content to the local file /temp/test.json. The data to sync is the detailed information that appears after the date.

    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V","id":1,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l","id":2,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs","id":3,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI","id":4,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV","id":5,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX","id":6,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8","id":7,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"gender":true,"name":{"a":"OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ"},"id":8,"salary":1254275.1144637289,"decimal":1254275.1144637289}

    The corresponding DataHub schema for the test data is as follows:

    Field name

    Field type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure the Flume file.

    In the ${FLUME_HOME}/conf directory, create a file named datahub_basic.conf and add the following content to the file. This example uses Exec Source as the data source. For more information about other sources, see the official Flume documentation.

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.json
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = JSON
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. Start Flume.

    The Dflume.root.logger=INFO,console option outputs logs to the console in real time. To obtain more information, use the DEBUG mode. Run the following command to start Flume and ingest data from the CSV file to DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Source example

Read data from DataHub to other systems

You can use the DataHub-Flume source to read data from DataHub and move it to another system. This topic uses a logger sink that outputs directly to the console as an example to show how to use the DataHub-Flume source.

  1. The following is an example topic schema.

    Field name

    Field type

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Configure the Flume file.

    In the ${FLUME_HOME}/conf directory, create a file named datahub_source.conf and add the following content to the file.

     # A single-node Flume configuration for DataHub
     # Name the components on this agent
     a1.sources = r1
     a1.sinks = k1
     a1.channels = c1
    
     # Describe/configure the source
     a1.sources.r1.type = com.aliyun.datahub.flume.sink.DatahubSource
     a1.sources.r1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
     a1.sources.r1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
     a1.sources.r1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
     a1.sources.r1.datahub.project = datahub_test
     a1.sources.r1.datahub.topic = test_flume
     a1.sources.r1.datahub.subId = {YOUR_ALIYUN_DATAHUB_SUB_ID}
     a1.sources.r1.serializer = DELIMITED
     a1.sources.r1.serializer.delimiter = ,
     a1.sources.r1.serializer.charset = UTF-8
     a1.sources.r1.datahub.retryTimes = 3
     a1.sources.r1.datahub.batchSize = 1000
     a1.sources.r1.datahub.batchTimeout = 5
     a1.sources.r1.datahub.enablePb = false
    
     # Describe the sink
     a1.sinks.k1.type = logger
    
     # Use a channel which buffers events in memory
     a1.channels.c1.type = memory
     a1.channels.c1.capacity = 10000
     a1.channels.c1.transactionCapacity = 10000
    
     # Bind the source and sink to the channel
     a1.sources.r1.channels = c1
     a1.sinks.k1.channel = c1
  3. Start Flume.

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_source.conf -Dflume.root.logger=INFO,console

Flume metrics

DataHub-Flume supports the built-in counter monitor of Flume, which you can use to monitor the running status of your Flume plugin. The sink and source of the DataHub-Flume plugin can display metric information. The following tables describe the DataHub-related parameters. For more information about other parameters, see the official Flume documentation.

DatahubSink

Name

Description

BatchEmptyCount

The number of times a batch timed out with no data to write to DataHub.

BatchCompleteCount

The number of successfully processed batches. This only includes cases where all data was written successfully.

EventDrainAttemptCount

The number of records that were attempted to be written to DataHub (the number of successfully parsed records).

BatchUnderflowCount

The number of times the amount of data successfully written to DataHub was less than the amount of data that needed to be written. This occurs when data parsing is complete, but writing to DataHub partially or completely fails.

EventDrainSuccessCount

The amount of data successfully written to DataHub.

DatahubSource

Name

Description

EventReceivedCount

The number of records received by the source from DataHub.

EventAcceptedCount

The number of records from DataHub that the source successfully wrote to the channel.

Flume monitoring

Flume provides multiple monitoring methods. This topic uses HTTP monitoring as an example to show how to use the monitoring tools of Flume. To use HTTP monitoring, add two parameters when you start the Flume plugin: -Dflume.monitoring.type=http -Dflume.monitoring.port=1234. The `type` parameter specifies the monitoring method, and the `port` parameter specifies the port number. The following is an example:

bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234

After the plugin starts, you can view the metrics on the web UI at https://ip:1234/metrics.

Note

For more information about monitoring methods, see the official Flume documentation.

FAQ

Flume fails to start and reports the error: org.apache.flume.ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight

The default heap memory for Flume is 20 MB. If you set the `batchSize` parameter to a large value, the heap memory that is used by Flume may exceed 20 MB.

Solution 1: Decrease the value of `batchSize`.

Solution 2: Increase the maximum heap memory for Flume.

  • $ vim bin/flume-ng

  • JAVA_OPTS="-Xmx20m" ==> JAVA_OPTS="-Xmx1024m"

Does the DataHub-Flume plugin support the JSON format?

No, it does not. However, you can parse data using custom regular expressions or modify the DataHub-Flume plugin code to add support for JSONEvent.

Does the DataHub-Flume plugin support BLOB topics?

The DataHub-Flume plugin currently supports only Tuple topics but not Blob topics.

Flume reports the error: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count

This error occurs because the channel is full and the source failed to write data to the channel. To resolve this issue, you can increase the channel capacity in the configuration file and reduce the `batchSize` of the DataHub source.

An error occurs when you use an old version of Flume, which may fail to start due to JAR package conflicts.

  • Scenario: When you use Flume 1.6, the startup may fail and the following error is reported: java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;. This error occurs because the JAR packages that the new plugin depends on are inconsistent with the versions on which Flume depends. If you use the old JAR packages from Flume, the new method cannot be found.

  • Solution: Delete the following three JAR packages from the ${FLUME_HOME}/lib directory.

    • jackson-annotations-2.3.0.jar

    • jackson-databind-2.3.1.jar

    • jackson-annotations-2.3.0.jar

Empty strings are automatically converted to null during data ingestion with Flume.

In version 2.0.2 of the Flume plugin, non-empty strings are trimmed and empty strings are converted to null. This issue is fixed in version 2.0.3. In version 2.0.3, empty strings are written to DataHub as empty strings.

The startup fails with the error: Cannot invoke "com.google.common.cache.LoadingCache.get(Object)" because"com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache" is null]

Delete the `guava` and `zstd` JAR files from the Flume `lib` folder and restart Flume.