The DataHub plug-in for Flume is a DataHub plug-in for data subscription and publishing. This plug-in is developed based on Apache Flume. This plug-in can write the collected data to DataHub and read data from DataHub and then write the data to other systems. This plug-in complies with the development conventions of Apache Flume plug-ins and is easy to install. You can use this plug-in to publish data to DataHub and subscribe to the data within DataHub.
Install the DataHub plug-in for Flume
Limits for installation
The version of Java Development Kit (JDK) must be 1.8 or later.
The version of Apache Maven must be 3.X.
The version of Flume-NG must be
1.X
.
Install the DataHub plug-in for Flume
Download Apache Flume. Skip this step if Apache Flume is downloaded.
$ tar zxvf apache-flume-1.11.0-bin.tar.gz
NoteFor ease of illustration,
${FLUME_HOME}
is used in the following information to specify the home directory of Apache Flume.Install the DataHub plug-in for Flume.
Directly install the DataHub plug-in for Flume.
Download the DataHub plug-in for Flume .
Extract the DataHub plug-in for Flume from the package and save the plug-in in the
${FLUME_HOME}/plugins.d
directory.$ tar aliyun-flume-datahub-sink-x.x.x.tar.gz $ cd aliyun-flume-datahub-sink-x.x.x $ mkdir ${FLUME_HOME}/plugins.d $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
Use source code to install the DataHub plug-in for Flume.
Download the source code from aliyun-maxcompute-data-collectors.
Compile the source code to install the plug-in.
$ cd aliyun-maxcompute-data-collectors $ mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true $ cd flume-plugin/target $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
Parameters
Sink-related parameters
Source-related parameters
Case description
Sink use cases
Case 1: DELIMITED serializer
Case 2: REGEX serializer
Case 3: Flume taildir source
Case 4: JSON serializer
Source use cases
Read data from DataHub to other systems
Flume metric
The DataHub plug-in for Flume supports the built-in counting metrics of Flume. You can monitor the operations of the plug-in based on the metrics. Different metrics are supported for the sinks and sources of the DataHub plug-in for Flume. The following table describes DataHub related metrics. For more information about other metrics, see Available Component Metrics.
DatahubSink
DatahubSource
Flume monitoring
Apache Flume provides various monitoring methods. This part describes how to enable HTTP monitoring. For more information about other monitoring methods, see Monitoring. To enable HTTP monitoring, add the following two parameters when you start the DataHub plug-in for Flume: Dflume.monitoring.type=http and Dflume.monitoring.port=1234
. The value http for the Dflume.monitoring.type parameter indicates HTTP monitoring, and the value 1234 for the Dflume.monitoring.port parameter indicates the port number. The following code provides an example on how to start the plug-in:
bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
After the plug-in is started, you can log on to the web page to view the metrics. The URL is https://ip:1234/metrics.
For more information about other monitoring methods, see Flume 1.9.0 User Guide.
FAQ
What can I do if the "org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight" error is reported when I start the DataHub plug-in for Flume?
The default heap memory of the DataHub plug-in for Flume is 20 MB. If the specified number of records to be written at a time is great, the heap memory used by the DataHub plug-in for Flume exceeds 20 MB. You can use one of the following solutions to resolve the issue:
Solution 1: Reduce the value of the batchSize parameter.
Solution 2: Increase the maximum heap memory of the DataHub plug-in for Flume.
$ vim bin/flume-ng
JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"
Does the DataHub plug-in for Flume support the JSON format?
No. However, you can use custom regular expressions to parse data, or modify the code of the DataHub plug-in for Flume and add JSONEvent to support the JSON format.
Does the DataHub plug-in for Flume support topics whose data type is BLOB?
No. The DataHub plug-in for Flume supports only topics whose data type is TUPLE.
Why does the DataHub plug-in for Flume report the "org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count" error?
The Flume channel is full, and the source fails to write data to the Flume channel. You can modify the channel capacity in the configuration file and appropriately reduce the value of the batchSize parameter to resolve this issue.
What can I do in the following case: An error occurs when I use an earlier version of Apache Flume. The DataHub plug-in for Flume fails to be started due to the conflict of JAR packages.
For example, when Apache Flume V1.6 is used, the
java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;
error is reported. The plug-in of a later version and Apache Flume V1.6 depend on different versions of the JAR package. Apache Flume V1.6 depends on an earlier version of the JAR package, and thus the methods provided by the plug-in of a later version cannot be found.Delete the following three JAR packages in the ${FLUME_HOME}/lib directory:
jackson-annotations-2.3.0.jar
jackson-databind-2.3.1.jar
jackson-annotations-2.3.0.jar
What can I do if empty strings are automatically converted to NULL when I use the DataHub plug-in for Flume to collect data?
In the DataHub plug-in for Flume V2.0.2, the trim() method is used for non-empty strings, and empty strings are directly converted to NULL. This logic is removed in the DataHub plug-in for Flume V2.0.3. Empty strings are retained rather than converted to NULL after they are written to DataHub.
Cannot invoke "com.google.common.cache. LoadingCache.get(Object)" because "com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache" is null].
Delete the guava.jar and zstd.jar files in the lib directory of Flume.