通过自定义Source,您可以自行扩展更多的数据源,例如,加密的数据流、自建的服务端口和专有的数据存储中心等。本文通过示例为您介绍如何自定义Source。

前提条件

  • 已创建集群,并且选择了Flume服务,详情请参见创建集群
  • 本地安装了文件传输工具(SSH Secure File Transfer Client)。

操作步骤

  1. 创建自定义Source。
    1. 添加pom依赖。
      <dependencies>
          <dependency>
              <groupId>org.apache.flume</groupId>
              <artifactId>flume-ng-core</artifactId>
              <version>1.9.0</version>
          </dependency>
      </dependencies>
      说明 1.9.0为Flume的版本信息,需要根据您Flume的版本信息进行替换。
    2. 编写自定义的Source类。
      org.example.MySource实现了一个按照特定格式打印日志的Source。
      package org.example;
      
      import java.text.SimpleDateFormat;
      import java.util.Date;
      
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.PollableSource;
      import org.apache.flume.conf.Configurable;
      import org.apache.flume.event.SimpleEvent;
      import org.apache.flume.source.AbstractSource;
      
      public class MySource extends AbstractSource implements Configurable, PollableSource {
          private String myDateFormat;
          private int myIntervalMS;
      
          @Override
          public void configure(Context context) {
              String myFormat = context.getString("dateFormat", "HH:mm:ss.SSS");
              int myInterval = context.getInteger("intervalMS", 1000);
      
              // Process the myProp value (e.g. validation, convert to another type, ...)
      
              // Store myProp for later retrieval by process() method
              this.myDateFormat = myFormat;
              this.myIntervalMS = myInterval;
          }
      
          @Override
          public void start() {
              // Initialize the connection to the external client
          }
      
          @Override
          public void stop () {
              // Disconnect from external client and do any additional cleanup
              // (e.g. releasing resources or nulling-out field values) ..
          }
      
          @Override
          public Status process() throws EventDeliveryException {
              Status status = null;
      
              try {
                  // This try clause includes whatever Channel/Event operations you want to do
      
                  // Receive new data
                  Event e = new SimpleEvent();
      
                  Date date = new Date();
                  SimpleDateFormat sdf = new SimpleDateFormat(myDateFormat);
                  e.setBody((sdf.format(date)).getBytes());
      
                  // Store the Event into this Source's associated Channel(s)
                  getChannelProcessor().processEvent(e);
      
                  status = Status.READY;
      
              } catch (Exception e) {
                  // Log exception, handle individual exceptions as needed
      
                  status = Status.BACKOFF;
                  e.printStackTrace();
      
              }
      
              try {
                  Thread.sleep(myIntervalMS);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
      
              return status;
          }
      
          @Override
          public long getBackOffSleepIncrement() {
              return 0;
          }
      
          @Override
          public long getMaxBackOffSleepInterval() {
              return 0;
          }
      }
  2. 将自定义的代码打成JAR包。
    pom.xml所在目录,执行如下命令制作JAR包。
    mvn clean package -DskipTests
  3. 使用文件传输工具,上传生成的JAR包至Flume的/usr/lib/flume-current/lib目录。
    说明 非EMR集群时,请上传到您实际Flume的安装目录。
  4. 新增配置。
    1. 通过SSH方式登录集群,详情请参见登录集群
    2. 执行以下命令,进入/conf目录。
      cd /usr/lib/flume-current/conf
    3. 执行以下命令,新增配置文件。
      vim mysource.conf
      说明 本文示例中配置文件为 mysource.conf,您可以自定义文件名称。
    4. 添加如下内容至配置文件mysource.conf中。
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      a1.sources.r1.type = org.example.MySource
      a1.sources.r1.dateFormat = HH:mm:ss.SSS
      a1.sources.r1.intervalMS = 2000
      
      a1.sinks.k1.type = logger
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      说明 代码中的 dateFormat表示日期格式, intervalMS表示间隔时间,单位ms。
  5. 启动Flume。
    1. 执行以下命令,进入/flume-current目录。
      cd /usr/lib/flume-current
    2. 执行以下命令,启动Flume。
      bin/flume-ng agent --name a1 -c conf -f conf/mysource.conf -Dflume.root.logger=INFO,console
      返回如下信息。
      2021-07-16 14:44:27,620 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
      2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
      2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
      2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
      2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
      2021-07-16 14:44:27,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 37 2E 37 30 35             14:44:27.705 }
      2021-07-16 14:44:29,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 39 2E 37 30 39             14:44:29.709 }
      2021-07-16 14:44:31,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 31 2E 37 30 39             14:44:31.709 }
      2021-07-16 14:44:33,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 33 2E 37 31 30             14:44:33.710 }
      2021-07-16 14:44:35,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 35 2E 37 31 30             14:44:35.710 }
      2021-07-16 14:44:37,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 37 2E 37 31 30             14:44:37.710 }
      2021-07-16 14:44:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 39 2E 37 31 30             14:44:39.710 }