You can configure a custom sink to integrate additional data storage components. You can also configure a custom sink to tailor or optimize the features of an existing sink based on your business requirements. This topic provides an example on how to configure a custom sink.

Prerequisites

  • An E-MapReduce (EMR) cluster is created, and Flume is selected from the optional services when you create the cluster. For more information, see Create a cluster.
  • SSH Secure File Transfer Client is installed on your computer.

Procedure

  1. Create a custom sink.
    1. Add the following dependency to the pom.xml file:
      <dependencies>
          <dependency>
              <groupId>org.apache.flume</groupId>
              <artifactId>flume-ng-core</artifactId>
              <version>1.9.0</version>
          </dependency>
      </dependencies>
      Note 1.9.0 is the version of Flume. Replace the version based on the information of the cluster that you created.
    2. Write a custom sink class.
      org.example.MySink implements a sink that has a larger default buffer based on LoggerSink.
      package org.example;
      
      import org.apache.flume.Channel;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.Transaction;
      import org.apache.flume.conf.Configurable;
      import org.apache.flume.event.EventHelper;
      import org.apache.flume.sink.AbstractSink;
      import org.apache.flume.sink.LoggerSink;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class MySink extends AbstractSink implements Configurable {
          private static final Logger logger = LoggerFactory.getLogger(MySink.class);
      
          // Default Max bytes to dump
          public static final int DEFAULT_MAX_BYTE_DUMP = 32;
      
          // Max number of bytes to be dumped
          private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
      
          public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog";
      
          private String myProp;
      
          @Override
          public void configure(Context context) {
              this.maxBytesToLog = context.getInteger(MAX_BYTES_DUMP_KEY, DEFAULT_MAX_BYTE_DUMP);
          }
      
          @Override
          public void start() {
              // Initialize the connection to the external repository (e.g. HDFS) that
              // this Sink will forward Events to ..
          }
      
          @Override
          public void stop () {
              // Disconnect from the external respository and do any
              // additional cleanup (e.g. releasing resources or nulling-out
              // field values) ..
          }
      
          @Override
          public Status process() throws EventDeliveryException {
              Status status = Status.READY;
      
              // Start transaction
              Channel ch = getChannel();
              Transaction txn = ch.getTransaction();
              Event event = null;
              try {
                  txn.begin();
                  // This try clause includes whatever Channel operations you want to do
                  event = ch.take();
                  // Send the Event to the external repository.
                  // storeSomeData(e);
      
                  if (event != null) {
                      if (logger.isInfoEnabled()) {
                          logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
                      }
                  } else {
                      // No event found, request back-off semantics from the sink runner
                      status = Status.BACKOFF;
                  }
                  txn.commit();
              } catch (Exception e) {
                  txn.rollback();
                  throw new EventDeliveryException("Failed to log event: " + event, e);
              } finally {
                  txn.close();
              }
              return status;
          }
      }
  2. Package the custom code into a JAR file.
    In the directory where the pom.xml file is stored, run the following command to create a JAR file:
    mvn clean package -DskipTests
  3. Use SSH Secure File Transfer Client to upload the generated JAR file to the /opt/apps/FLUME/flume-current/lib directory of Flume.
    Note If your cluster is not an EMR cluster, upload the JAR file to the actual installation directory of Flume.
  4. Add configurations.
    1. Log on to your cluster in SSH mode. For more information, see Log on to a cluster.
    2. Run the following command to go to the /conf directory:
      cd /opt/apps/FLUME/flume-current/conf
    3. Run the following command to add a configuration file:
      vim custom_sink.conf
      Note In this example, the custom_sink.conf file is added. You can customize the name of the file.
    4. Add the following content to the custom_sink.conf file:
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.r1.type = org.apache.flume.source.StressSource
      a1.sources.r1.maxEventsPerSecond = 1
      a1.sources.r1.batchSize = 1
      a1.sources.r1.maxTotalEvents = 100
      
      a1.sinks.k1.type = org.example.MySink
      a1.sinks.k1.maxBytesToLog = 64
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      Note maxBytesToLog specifies the maximum number of bytes allowed in your buffer.
  5. Start Flume.
    1. Run the following command to go to the /flume-current directory:
      cd /opt/apps/FLUME/flume-current
    2. Run the following command to start Flume:
      bin/flume-ng agent --name a1 -c conf -f conf/custom_sink.conf  -Dflume.root.logger=INFO,console
      The following information is returned:
      2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
      2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms
      2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
      2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
      2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
      2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
      2021-07-16 14:49:29,526 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.StressSource.doStart(StressSource.java:169)] Stress source doStart finished
      2021-07-16 14:49:29,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:30,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:31,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:32,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:33,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }