You can configure a custom source to connect to more data sources, such as encrypted data streams, self-managed service ports, and dedicated data storage centers. This topic describes how to configure a custom source.
Prerequisites
- An E-MapReduce (EMR) cluster is created, and Flume is selected from the optional services when you create the cluster. For more information, see Create a cluster.
- SSH Secure File Transfer Client is installed on your computer.
Procedure
- Create a custom source.
- Add the following dependencies to the pom.xml file:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies>
Note1.9.0
is a Flume version. Replace 1.9.0 with the Flume version that you use. - Write a custom source class.
org.example.MySource
implements a source that displays logs in a specific format.package org.example; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; public class MySource extends AbstractSource implements Configurable, PollableSource { private String myDateFormat; private int myIntervalMS; @Override public void configure(Context context) { String myFormat = context.getString("dateFormat", "HH:mm:ss.SSS"); int myInterval = context.getInteger("intervalMS", 1000); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myDateFormat = myFormat; this.myIntervalMS = myInterval; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = new SimpleEvent(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat(myDateFormat); e.setBody((sdf.format(date)).getBytes()); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Exception e) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; e.printStackTrace(); } try { Thread.sleep(myIntervalMS); } catch (InterruptedException e) { e.printStackTrace(); } return status; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
- Add the following dependencies to the pom.xml file:
- Package the custom code into a JAR file. In the directory where the pom.xml file is stored, run the following command to create a JAR file:
mvn clean package -DskipTests
- Use SSH Secure File Transfer Client to upload the generated JAR file to the /opt/apps/FLUME/flume-current/lib directory of Flume. Note If your cluster is not an EMR cluster, upload the JAR file to the actual installation directory of Flume.
- Add configurations.
- Log on to the cluster in SSH mode. For more information, see Log on to a cluster.
- Run the following command to go to the /conf directory:
cd /opt/apps/FLUME/flume-current/conf
- Run the following command to add a configuration file:
vim mysource.conf
Note In this example, the name of the configuration file is mysource.conf. You can use a different file name. - Add the following content to the mysource.conf configuration file.
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = org.example.MySource a1.sources.r1.dateFormat = HH:mm:ss.SSS a1.sources.r1.intervalMS = 2000 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Note In the code,dateFormat
indicates a date format, andintervalMS
indicates the interval. Unit: milliseconds.
- Start Flume.
- Run the following command to go to the /flume-current directory:
cd /opt/apps/FLUME/flume-current
- Run the following command to start Flume:
bin/flume-ng agent --name a1 -c conf -f conf/mysource.conf -Dflume.root.logger=INFO,console
The following output is returned:2021-07-16 14:44:27,620 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2021-07-16 14:44:27,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 37 2E 37 30 35 14:44:27.705 } 2021-07-16 14:44:29,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 39 2E 37 30 39 14:44:29.709 } 2021-07-16 14:44:31,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 31 2E 37 30 39 14:44:31.709 } 2021-07-16 14:44:33,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 33 2E 37 31 30 14:44:33.710 } 2021-07-16 14:44:35,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 35 2E 37 31 30 14:44:35.710 } 2021-07-16 14:44:37,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 37 2E 37 31 30 14:44:37.710 } 2021-07-16 14:44:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 39 2E 37 31 30 14:44:39.710 }
- Run the following command to go to the /flume-current directory: