You can configure a custom sink to integrate additional data storage components. You can also configure a custom sink to tailor or optimize the features of an existing sink based on your business requirements. This topic provides an example on how to configure a custom sink.
Prerequisites
- An E-MapReduce (EMR) cluster is created, and Flume is selected from the optional services when you create the cluster. For more information, see Create a cluster.
- SSH Secure File Transfer Client is installed on your computer.
Procedure
- Create a custom sink.
- Add the following dependency to the pom.xml file:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies>
Note1.9.0
is the version of Flume. Replace the version based on the information of the cluster that you created. - Write a custom sink class.
org.example.MySink
implements a sink that has a larger default buffer based on LoggerSink.package org.example; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventHelper; import org.apache.flume.sink.AbstractSink; import org.apache.flume.sink.LoggerSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(MySink.class); // Default Max bytes to dump public static final int DEFAULT_MAX_BYTE_DUMP = 32; // Max number of bytes to be dumped private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP; public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog"; private String myProp; @Override public void configure(Context context) { this.maxBytesToLog = context.getInteger(MAX_BYTES_DUMP_KEY, DEFAULT_MAX_BYTE_DUMP); } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = Status.READY; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event = null; try { txn.begin(); // This try clause includes whatever Channel operations you want to do event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); if (event != null) { if (logger.isInfoEnabled()) { logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog)); } } else { // No event found, request back-off semantics from the sink runner status = Status.BACKOFF; } txn.commit(); } catch (Exception e) { txn.rollback(); throw new EventDeliveryException("Failed to log event: " + event, e); } finally { txn.close(); } return status; } }
- Add the following dependency to the pom.xml file:
- Package the custom code into a JAR file. In the directory where the pom.xml file is stored, run the following command to create a JAR file:
mvn clean package -DskipTests
- Use SSH Secure File Transfer Client to upload the generated JAR file to the /opt/apps/FLUME/flume-current/lib directory of Flume. Note If your cluster is not an EMR cluster, upload the JAR file to the actual installation directory of Flume.
- Add configurations.
- Log on to your cluster in SSH mode. For more information, see Log on to a cluster.
- Run the following command to go to the /conf directory:
cd /opt/apps/FLUME/flume-current/conf
- Run the following command to add a configuration file:
vim custom_sink.conf
Note In this example, the custom_sink.conf file is added. You can customize the name of the file. - Add the following content to the custom_sink.conf file:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.StressSource a1.sources.r1.maxEventsPerSecond = 1 a1.sources.r1.batchSize = 1 a1.sources.r1.maxTotalEvents = 100 a1.sinks.k1.type = org.example.MySink a1.sinks.k1.maxBytesToLog = 64 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
NotemaxBytesToLog
specifies the maximum number of bytes allowed in your buffer.
- Start Flume.
- Run the following command to go to the /flume-current directory:
cd /opt/apps/FLUME/flume-current
- Run the following command to start Flume:
bin/flume-ng agent --name a1 -c conf -f conf/custom_sink.conf -Dflume.root.logger=INFO,console
The following information is returned:2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms 2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2021-07-16 14:49:29,526 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.StressSource.doStart(StressSource.java:169)] Stress source doStart finished 2021-07-16 14:49:29,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ } 2021-07-16 14:49:30,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ } 2021-07-16 14:49:31,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ } 2021-07-16 14:49:32,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ } 2021-07-16 14:49:33,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
- Run the following command to go to the /flume-current directory: