Assistant Engineer
Assistant Engineer
  • UID622
  • Fans3
  • Follows0
  • Posts52

Introduction of Flume NG and Configuration Practice

More Posted time:Sep 6, 2016 9:36 AM
Summary:  Recently, we are engaged in development regarding collection of logs. After learning about the principles and implementation of Flume, I reposted an article to share with you.
    Flume, a real-time log collection system developed by Cloudera, has been recognized and widely used by the industry. The initial release of Flume is now collectively referred to as Flume OG (original generation), which belongs to Cloudera. But with the expansion of Flume functions, many disadvantages are exposed, such as bloated Flume OG code engineering, irrational design of core components, non-standard core configuration, etc.; especially in the last release version 0.94.0 of Flume OG, the log transfer instability is particularly serious. To solve these problems, Cloudera launched Flume-728 on October 22, 2011, involving some landmark changes to Flume: refactoring the core components, core configuration and code architecture, and the refactored version is collectively referred to as Flume NG (next generation). Another reason for the changes is that Flume has been incorporated into Apache, and Cloudera Flume was renamed Apache Flume. This article by IBM: Flume NG: The First Revolution in the History of Flume expounds the revolutionary changes in the development from Flume OG to Flume NG from the perspective of the basic components and user experience. We will not repeat them in detail here, but I'd like to mention the main changes in Flume NG (1.x.x):
• The sources and sinks are linked using channels.
• There are two main channels:
a) in-memory channel, providing non-persistent support at high speed
b) JDBC-based channel, providing persistent support
• The nodes will no longer be distinguished as logical or physical nodes, and all the physical nodes will be collectively referred to as agent, each of which can run 0 or multiple sources and sinks.
• The master nodes and the dependence on ZooKeeper will no longer be necessary, and the configuration files are simplified.
It is integrated with many plug-ins, with some being oriented to the users, tools or system developers.
• Thrift or Avro Flume sources can be used to send events from Flume 0.9.4 to Flume 1.x.
Note: The Flume used in this article is Flume-1.4.0-cdh4.7.0, which can be used after decompression without any additional installation process.
1. Some Core Concepts of Flume:

1.1 Data stream model
    Flume takes agent as the smallest independent running unit. An agent is a JVM. A single agent consists of three components: Source, Sink and Channel, as shown in the figure below:
    The data stream of Flume is subject to Events from the beginning to the end. Event is the basic data unit of Flume, and it carries both the log data (in byte array form) and the header information. These events are generated by the external Sources of Agent, such as Web Server in the above figure. After an event is captured, the Source will first implement specific formatting, and then push the event to (one or multiple) channels. You may regard the channel as a buffer as it will store the event until the Sink completes processing the event. The Sink is responsible for log persistence or pushing events to another Source.
    It's a straightforward design. But it is important to note that Flume provides many types of built-in Source, Channel and Sink. Different types of Source, Channel and Sink may form combinations freely. The combination modes are very flexible, dependent on the configuration files set by the user. For example: Channel can store the events either in the memory temporarily, or in the local disk persistently. Sink can write the logs into HDFS, HBase, or even another Source.
    It would be wrong to think that Flume only has these capabilities. Flume allows the user to establish multistage streams, that is, multiple agents can work cooperatively, and it also supports Fan-in, Fan-out, Contextual Routing and Backup Routes, as shown in the figure below:

1.2 High reliability
    As software running in the production environment, high reliability is a must.
    In terms of a single agent, Flume applies a transaction-based data transfer mode to ensure the reliability of event transfer. Source and Sink will be encapsulated into a transaction. The event will be stored in Channel and will not be removed until it is processed. This is the point-to-point reliability mechanism provided by Flume.
    In terms of multistage streams, the sink of the previous agent and the source of the next agent will rely on their transactions to guarantee the reliability of data.
1.3 Recoverability
It will also be implemented by the Channel. It is recommended to use FileChannel, and the events will be stored in the local file system persistently (with poor performance).
2. Introduction of the Overall Architecture of Flume
    The Flume architecture is a three-tier architecture of source --> channel --> sink on the whole, similar to the producer-consumer architecture, and the transfer and decoupling among tiers will be implemented via the queue (channel).
• Source: complete the collection of log data, classify it into transactions and events, and distribute them to the channels.
• Channel: mainly provide the functions of a queue, and implement simple caching of the data provided by the sources.
• Sink: fetch data in the channels, store it in corresponding storage file system/database, or submit it to the remote server.
    The usage requiring the minimum changes to the existing program is to directly read the originally recorded log files of the program, which can basically achieve seamless access without any changes to an existing program.
    There are mainly two ways for sources to read the files directly:
2.1 Exec source
    The data can be organized by executing Unix commands. The most commonly used is the tail -F [file].
    It can implement real-time transfer, but may lead to loss of data if Flume is not running or a script error occurs; in addition, the breakpoint resume function is not supported. As it does not record the position in the file where the reading is stopped last time, we cannot know where to start in the next read, especially when the log files are growing. If the source of Flume crashes, the log content added before the source of Flume is restarted will not be read by the source. However, Flume has an execStream extension, which can record the addition of monitoring logs, and transfer the added log content to the node of Flume via self-generated tools, and then to the node of sink. It would be perfect if it supports resuming transferring the content in the source of tail classes after the node crashes once the node starts up next time.
2.2 Spooling Directory Source
    SpoolSource: monitor the new files added under the configuration directory, and read the file data in quasi real time. Note the following two points:
    1) The files copied to the spool directory cannot be reopened for editing
2) The spool directory should contain corresponding subdirectories. In actual cases, it can be used in combination with log4j. When using log4j, set the file splitting mechanism as once per minute, and copy the files to the spool monitoring directory. log4j has a TimeRolling plug-in to copy the files split by log4j to the spool directory. It basically implements real-time monitoring. After the file transfer, Flume will modify the extension of the file to .COMPLETED (the extension can also be specified flexibly in the configuration file).
    Comparison between ExecSource and SpoolSource: ExecSource can implement real-time collection of logs, but will be unable to collect the log data if Flume is not running or a command execution error occurs, so it cannot guarantee integrity of the log data. Although SpoolSource is unable to implement real-time collection of data, it can split the files by minutes in near real-time. If the application cannot implement splitting of the log files by minutes, it can combine both of the two collection methods.
    Channel includes many modes: MemoryChannel, JDBC Channel, MemoryRecoverChannel and FileChannel. MemoryChannel can implement high-speed transmission, but is unable to guarantee data integrity. MemoryRecoverChannel should be replaced with FileChannel as recommended in the official document. FileChannel can guarantee integrity and consistency of the data. When configuring the FileChannel, it is recommended to set a directory different from that of the program log files for FileChannel in order to improve the efficiency.
    When setting data storage of Sink, it is able to store the data in the file system, database or hadoop. But for a small amount of data, it is advised to store it in the file system, and set a certain time interval to save the data. For a large amount of data, it is advised to store corresponding log data in Hadoop in order to facilitate analysis of corresponding data in the future.
3. Common Architecture/Function Configuration Examples
3.1 A simple example: Single-node Flume configuration
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Store the above configuration as: example.conf
Then we can start up Flume:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

PS: -Dflume.root.logger=INFO,console is only for debugging, please do not apply it in the production environment mechanically, otherwise a large number of logs will be returned to the terminal.
Then we will reopen a shell terminal window, and configure the listener port on telnet, and then we can send messages to check the effect:
$ telnet localhost 44444
Connected to localhost.localdomain (
Escape character is '^]'.
Hello world! <ENTER>

The following information will be output from the Flume terminal window, indicating success:
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

Now,the first Flume Agent is deployed successfully!
3.2 Single-node Flume direct writing into HDFS
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to Connect it to channel ch1.
#agent1.sources.avro-source1.channels = ch1
#agent1.sources.avro-source1.type = avro
#agent1.sources.avro-source1.bind =
#agent1.sources.avro-source1.port = 41414
#agent1.sources.avro-source1.threads = 5

#define source monitor a file
agent1.sources.avro-source1.type = exec
agent1.sources.avro-source1.shell = /bin/bash -c
agent1.sources.avro-source1.command = tail -n +0 -F /home/storm/tmp/id.txt
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.threads = 5

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = hdfs
agent1.sinks.log-sink1.hdfs.path = hdfs://
agent1.sinks.log-sink1.hdfs.writeFormat = Text
agent1.sinks.log-sink1.hdfs.fileType = DataStream
agent1.sinks.log-sink1.hdfs.rollInterval = 0
agent1.sinks.log-sink1.hdfs.rollSize = 1000000
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.batchSize = 1000
agent1.sinks.log-sink1.hdfs.txnEventMax = 1000
agent1.sinks.log-sink1.hdfs.callTimeout = 60000
agent1.sinks.log-sink1.hdfs.appendTimeout = 60000

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

Start the following commands to check the effect in HDFS.
../bin/flume-ng agent --conf ../conf/ -f flume_directHDFS.conf -n agent1 -Dflume.root.logger=INFO,console

PS: In case of such requests in the actual environment, it is advised to tail the logs on multiple agent clients, and then send them to the collector to collect the data and send it to HDFS in a unified manner for storage. When the HDFS file exceeds a certain size or beyond the specified time interval, a file will be generated.
Flume can implement two triggers, which are respectively SizeTrigger (it will count the sum of the size of the stream written when invoking HDFS to write the output stream; if it exceeds a certain size, it will create a new file and output stream, direct the writing to the new output streams, and close the previous output stream) and TimeTrigger (it will start the timer; while reaching the point, it will automatically create a new file and output stream, redirect the new writing to the stream, and close the previous output stream).
3.3 A common architecture: Multi-agent consolidated writing into HDFS
A fan-in flow using Avro RPC to consolidate events in one place
3.3.1 Configure Flume Client on each webserv logging machine
# clientMainAgent
clientMainAgent.channels = c1
clientMainAgent.sources  = s1
clientMainAgent.sinks    = k1 k2
# clientMainAgent sinks group
clientMainAgent.sinkgroups = g1
# clientMainAgent Spooling Directory Source
clientMainAgent.sources.s1.type = spooldir
clientMainAgent.sources.s1.spoolDir  =/dsap/rawdata/
clientMainAgent.sources.s1.fileHeader = true
clientMainAgent.sources.s1.deletePolicy =immediate
clientMainAgent.sources.s1.batchSize =1000
clientMainAgent.sources.s1.channels =c1
clientMainAgent.sources.s1.deserializer.maxLineLength =1048576
# clientMainAgent FileChannel
clientMainAgent.channels.c1.type = file
clientMainAgent.channels.c1.checkpointDir = /var/flume/fchannel/spool/checkpoint
clientMainAgent.channels.c1.dataDirs = /var/flume/fchannel/spool/data
clientMainAgent.channels.c1.capacity = 200000000
clientMainAgent.channels.c1.keep-alive = 30
clientMainAgent.channels.c1.write-timeout = 30
# clientMainAgent Sinks
# k1 sink
clientMainAgent.sinks.k1.channel = c1
clientMainAgent.sinks.k1.type = avro
# connect to CollectorMainAgent
clientMainAgent.sinks.k1.hostname = flume115
clientMainAgent.sinks.k1.port = 41415
# k2 sink
clientMainAgent.sinks.k2.channel = c1
clientMainAgent.sinks.k2.type = avro
# connect to CollectorBackupAgent
clientMainAgent.sinks.k2.hostname = flume116
clientMainAgent.sinks.k2.port = 41415
# clientMainAgent sinks group
clientMainAgent.sinkgroups.g1.sinks = k1 k2
# load_balance type
clientMainAgent.sinkgroups.g1.processor.type = load_balance
clientMainAgent.sinkgroups.g1.processor.backoff   = true
clientMainAgent.sinkgroups.g1.processor.selector  = random

../bin/flume-ng agent --conf ../conf/ -f flume_Consolidation.conf -n clientMainAgent -Dflume.root.logger=DEBUG,console

3.3.2 Configure Flume server at the consolidation node
# collectorMainAgent
collectorMainAgent.channels = c2
collectorMainAgent.sources  = s2
collectorMainAgent.sinks    =k1 k2
# collectorMainAgent AvroSource
collectorMainAgent.sources.s2.type = avro
collectorMainAgent.sources.s2.bind = flume115
collectorMainAgent.sources.s2.port = 41415
collectorMainAgent.sources.s2.channels = c2

# collectorMainAgent FileChannel
collectorMainAgent.channels.c2.type = file
collectorMainAgent.channels.c2.checkpointDir =/opt/var/flume/fchannel/spool/checkpoint
collectorMainAgent.channels.c2.dataDirs = /opt/var/flume/fchannel/spool/data,/work/flume/fchannel/spool/data
collectorMainAgent.channels.c2.capacity = 200000000
# collectorMainAgent hdfsSink
collectorMainAgent.sinks.k2.type = hdfs
collectorMainAgent.sinks.k2.channel = c2
collectorMainAgent.sinks.k2.hdfs.path = hdfs://db-cdh-cluster/flume%{dir}
collectorMainAgent.sinks.k2.hdfs.filePrefix =k2_%{file}
collectorMainAgent.sinks.k2.hdfs.inUsePrefix =_
collectorMainAgent.sinks.k2.hdfs.inUseSuffix =.tmp
collectorMainAgent.sinks.k2.hdfs.rollSize = 0
collectorMainAgent.sinks.k2.hdfs.rollCount = 0
collectorMainAgent.sinks.k2.hdfs.rollInterval = 240
collectorMainAgent.sinks.k2.hdfs.writeFormat = Text
collectorMainAgent.sinks.k2.hdfs.fileType = DataStream
collectorMainAgent.sinks.k2.hdfs.batchSize = 6000
collectorMainAgent.sinks.k2.hdfs.callTimeout = 60000
collectorMainAgent.sinks.k1.type = hdfs
collectorMainAgent.sinks.k1.channel = c2
collectorMainAgent.sinks.k1.hdfs.path = hdfs://db-cdh-cluster/flume%{dir}
collectorMainAgent.sinks.k1.hdfs.filePrefix =k1_%{file}
collectorMainAgent.sinks.k1.hdfs.inUsePrefix =_
collectorMainAgent.sinks.k1.hdfs.inUseSuffix =.tmp
collectorMainAgent.sinks.k1.hdfs.rollSize = 0
collectorMainAgent.sinks.k1.hdfs.rollCount = 0
collectorMainAgent.sinks.k1.hdfs.rollInterval = 240
collectorMainAgent.sinks.k1.hdfs.writeFormat = Text
collectorMainAgent.sinks.k1.hdfs.fileType = DataStream
collectorMainAgent.sinks.k1.hdfs.batchSize = 6000
collectorMainAgent.sinks.k1.hdfs.callTimeout = 60000

../bin/flume-ng agent --conf ../conf/ -f flume_Consolidation.conf -n collectorMainAgent -Dflume.root.logger=DEBUG,console

The architecture for the above example is similar to cs: The Flume agent nodes will aggregate the logs from each machine to the Consolidation nodes first, then these nodes will write it into HDFS in a unified manner, where the load balancing model is applied, and you can also configure high-availability models.
4. Possible Problems:
4.1 OOM:
Flume reports an error: java.lang.OutOfMemoryError:  GC overhead limit exceeded
or: java.lang.OutOfMemoryError:  Java heap space
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space
Try to add startup parameters in .bashrc or env.sh:
export JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

4.2 JDK version incompatibility:
2014-07-07 14:44:17,902 (agent-shutdown-hook) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:504)] Exception while closing hdfs:// Exception follows.
java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses.
  at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180)
  at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetFileInfoRequestProto.getSerializedSize(ClientNamenodeProtocolProtos.java:30108)
  at com.google.protobuf.AbstractMessageLite.toByteString(AbstractMessageLite.java:49)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.constructRpcRequest(ProtobufRpcEngine.java:149)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:193)

Try to replace jdk7 with jdk6.