You can configure a custom source to connect to more data sources, such as encrypted data streams, self-managed service ports, and dedicated data storage centers. This topic describes how to configure a custom source.

Prerequisites

  • An E-MapReduce (EMR) cluster is created, and Flume is selected from the optional services when you create the cluster. For more information, see Create a cluster.
  • SSH Secure File Transfer Client is installed on your computer.

Procedure

  1. Create a custom source.
    1. Add the following dependencies to the pom.xml file:
      <dependencies>
          <dependency>
              <groupId>org.apache.flume</groupId>
              <artifactId>flume-ng-core</artifactId>
              <version>1.9.0</version>
          </dependency>
      </dependencies>
      Note 1.9.0 is a Flume version. Replace 1.9.0 with the Flume version that you use.
    2. Write a custom source class.
      org.example.MySource implements a source that displays logs in a specific format.
      package org.example;
      
      import java.text.SimpleDateFormat;
      import java.util.Date;
      
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.PollableSource;
      import org.apache.flume.conf.Configurable;
      import org.apache.flume.event.SimpleEvent;
      import org.apache.flume.source.AbstractSource;
      
      public class MySource extends AbstractSource implements Configurable, PollableSource {
          private String myDateFormat;
          private int myIntervalMS;
      
          @Override
          public void configure(Context context) {
              String myFormat = context.getString("dateFormat", "HH:mm:ss.SSS");
              int myInterval = context.getInteger("intervalMS", 1000);
      
              // Process the myProp value (e.g. validation, convert to another type, ...)
      
              // Store myProp for later retrieval by process() method
              this.myDateFormat = myFormat;
              this.myIntervalMS = myInterval;
          }
      
          @Override
          public void start() {
              // Initialize the connection to the external client
          }
      
          @Override
          public void stop () {
              // Disconnect from external client and do any additional cleanup
              // (e.g. releasing resources or nulling-out field values) ..
          }
      
          @Override
          public Status process() throws EventDeliveryException {
              Status status = null;
      
              try {
                  // This try clause includes whatever Channel/Event operations you want to do
      
                  // Receive new data
                  Event e = new SimpleEvent();
      
                  Date date = new Date();
                  SimpleDateFormat sdf = new SimpleDateFormat(myDateFormat);
                  e.setBody((sdf.format(date)).getBytes());
      
                  // Store the Event into this Source's associated Channel(s)
                  getChannelProcessor().processEvent(e);
      
                  status = Status.READY;
      
              } catch (Exception e) {
                  // Log exception, handle individual exceptions as needed
      
                  status = Status.BACKOFF;
                  e.printStackTrace();
      
              }
      
              try {
                  Thread.sleep(myIntervalMS);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
      
              return status;
          }
      
          @Override
          public long getBackOffSleepIncrement() {
              return 0;
          }
      
          @Override
          public long getMaxBackOffSleepInterval() {
              return 0;
          }
      }
  2. Package the custom code into a JAR file.
    In the directory where the pom.xml file is stored, run the following command to create a JAR file:
    mvn clean package -DskipTests
  3. Use SSH Secure File Transfer Client to upload the generated JAR file to the /opt/apps/FLUME/flume-current/lib directory of Flume.
    Note If your cluster is not an EMR cluster, upload the JAR file to the actual installation directory of Flume.
  4. Add configurations.
    1. Log on to the cluster in SSH mode. For more information, see Log on to a cluster.
    2. Run the following command to go to the /conf directory:
      cd /opt/apps/FLUME/flume-current/conf
    3. Run the following command to add a configuration file:
      vim mysource.conf
      Note In this example, the name of the configuration file is mysource.conf. You can use a different file name.
    4. Add the following content to the mysource.conf configuration file.
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      a1.sources.r1.type = org.example.MySource
      a1.sources.r1.dateFormat = HH:mm:ss.SSS
      a1.sources.r1.intervalMS = 2000
      
      a1.sinks.k1.type = logger
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      Note In the code, dateFormat indicates a date format, and intervalMS indicates the interval. Unit: milliseconds.
  5. Start Flume.
    1. Run the following command to go to the /flume-current directory:
      cd /opt/apps/FLUME/flume-current
    2. Run the following command to start Flume:
      bin/flume-ng agent --name a1 -c conf -f conf/mysource.conf -Dflume.root.logger=INFO,console
      The following output is returned:
      2021-07-16 14:44:27,620 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
      2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
      2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
      2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
      2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
      2021-07-16 14:44:27,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 37 2E 37 30 35             14:44:27.705 }
      2021-07-16 14:44:29,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 39 2E 37 30 39             14:44:29.709 }
      2021-07-16 14:44:31,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 31 2E 37 30 39             14:44:31.709 }
      2021-07-16 14:44:33,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 33 2E 37 31 30             14:44:33.710 }
      2021-07-16 14:44:35,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 35 2E 37 31 30             14:44:35.710 }
      2021-07-16 14:44:37,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 37 2E 37 31 30             14:44:37.710 }
      2021-07-16 14:44:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 39 2E 37 31 30             14:44:39.710 }