The Flume-DataHub plugin is a change tracking and publishing plugin for DataHub built on Flume. You can use it to write data to DataHub or read data from DataHub and write it to other systems. This plugin complies with Flume development standards, is easy to install, and lets you publish and subscribe to data in DataHub.
Install the Flume plugin
Installation restrictions
JDK 1.8 or later.
Apache Maven version 3.x.
Flume-NG version
1.x.
Install Flume
Download Flume. You can skip this step if you have already downloaded Flume.
$ tar zxvf apache-flume-1.11.0-bin.tar.gzNoteIn this document,
${FLUME_HOME}refers to the Flume home directory.Install Flume-DataHub.
Direct installation
Download the Flume-DataHub plugin.
Unzip the Flume plugin and move it to the
${FLUME_HOME}/plugins.ddirectory.$ tar aliyun-flume-datahub-sink-x.x.x.tar.gz $ cd aliyun-flume-datahub-sink-x.x.x $ mkdir ${FLUME_HOME}/plugins.d $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
Install from source code.
Download the source code from aliyun-maxcompute-data-collectors.
Compile and install.
$ cd aliyun-maxcompute-data-collectors $ mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true $ cd flume-plugin/target $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
Parameter reference
Sink parameters
Source parameters
Use case
Sink examples
Example 1: DELIMITED serializer
Example 2: REGEX serializer
Example 3: Flume Taildir Source
Example 4: JSON serializer
Source example
Read data from DataHub to other systems
Flume metrics
DataHub-Flume supports the built-in counter monitor of Flume, which you can use to monitor the running status of your Flume plugin. The sink and source of the DataHub-Flume plugin can display metric information. The following tables describe the DataHub-related parameters. For more information about other parameters, see the official Flume documentation.
DatahubSink
DatahubSource
Flume monitoring
Flume provides multiple monitoring methods. This topic uses HTTP monitoring as an example to show how to use the monitoring tools of Flume. To use HTTP monitoring, add two parameters when you start the Flume plugin: -Dflume.monitoring.type=http -Dflume.monitoring.port=1234. The `type` parameter specifies the monitoring method, and the `port` parameter specifies the port number. The following is an example:
bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234After the plugin starts, you can view the metrics on the web UI at https://ip:1234/metrics.
For more information about monitoring methods, see the official Flume documentation.
FAQ
Flume fails to start and reports the error: org.apache.flume.ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
The default heap memory for Flume is 20 MB. If you set the `batchSize` parameter to a large value, the heap memory that is used by Flume may exceed 20 MB.
Solution 1: Decrease the value of `batchSize`.
Solution 2: Increase the maximum heap memory for Flume.
$ vim bin/flume-ngJAVA_OPTS="-Xmx20m" ==> JAVA_OPTS="-Xmx1024m"
Does the DataHub-Flume plugin support the JSON format?
No, it does not. However, you can parse data using custom regular expressions or modify the DataHub-Flume plugin code to add support for JSONEvent.
Does the DataHub-Flume plugin support BLOB topics?
The DataHub-Flume plugin currently supports only Tuple topics but not Blob topics.
Flume reports the error: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count
This error occurs because the channel is full and the source failed to write data to the channel. To resolve this issue, you can increase the channel capacity in the configuration file and reduce the `batchSize` of the DataHub source.
An error occurs when you use an old version of Flume, which may fail to start due to JAR package conflicts.
Scenario: When you use Flume 1.6, the startup may fail and the following error is reported:
java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;. This error occurs because the JAR packages that the new plugin depends on are inconsistent with the versions on which Flume depends. If you use the old JAR packages from Flume, the new method cannot be found.Solution: Delete the following three JAR packages from the ${FLUME_HOME}/lib directory.
jackson-annotations-2.3.0.jarjackson-databind-2.3.1.jarjackson-annotations-2.3.0.jar
Empty strings are automatically converted to null during data ingestion with Flume.
In version 2.0.2 of the Flume plugin, non-empty strings are trimmed and empty strings are converted to null. This issue is fixed in version 2.0.3. In version 2.0.3, empty strings are written to DataHub as empty strings.
The startup fails with the error: Cannot invoke "com.google.common.cache.LoadingCache.get(Object)" because"com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache" is null]
Delete the `guava` and `zstd` JAR files from the Flume `lib` folder and restart Flume.