edit-icon download-icon

Configure Flume Reader

Last Updated: Mar 21, 2018

Apache Flume is a distributed service with high availability for large-scale log collection, aggregation, and transmission. It allows you to customize various types of data senders in the log system for data collection purposes. Apache Flume also provides the ability to process data and write it to various data recipients (for example, text, HDFS, and Hbase).

The data stream of Flume is subject to events from the beginning to the end. Event is the basic data unit of Flume, and it carries both the log data (in byte array form) and the header information. These events are generated by the external sources of agent. After an event is captured, the source first implements specific formatting, and then pushes the event to channel(s). You may regard the channel as a buffer as it stores the event until the sink completes processing the event. The sink is responsible for log persistence or pushing events to another Source.

Flume components

Flume components are shown as follows:

Component Feature
Agent Run Flume using JVM. Only one agent runs on each machine, but multiple sources and sinks can be contained in one agent.
Client Produce data, and run in a separate thread.
Source Complete log data collection , classify it into transactions and events, and distribute them to the channel(s).
Sink Fetch data in the channels, store it in corresponding file storage system or database, or submit it to a remote server.
Channel Mainly provide the features of a queue, and cache the data provided by the sources.
Events Can log records and Avro objects.

Log streaming configuration mainly involves the following parameters:

  • Channel parameters: file-channel and memory-channel.
  • Source parameters: TailDirSource.

Channel parameter configurations

The channel located between the source and sink is used to cache the inbound events. Events are removed from the channel after being sent successfully by the sink to the next channel or the final destination.

File-channel

The file-channel is used to enable the persistence of all events, and store them in the disk. Therefore, data loss does not occur even if the machine fails, the operating system crashes or restarts, or the event has not been successfully delivered to the next agent in the pipe.

File-channel parameters are set as follows:

Parameter name Default value Description
Type Type name
CheckpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint files are stored
DataDirs ~/.flume/file-channel/data The directory where the data is stored
Capacity 100000 The maximum channel capacity
Bytecapacity 2000000 The maximum byte capacity of channel
TransactionCapacity 100000 The maximum transaction capacity
Checkpointinterval 30000 The value of the interval between neighboring checkpoints (in microseconds)
Maxfilesize 2146435071 The maximum size of a single log (in bytes)
Minimumrequiredspace 524288000 The minimum size of request idle space (in bytes)
Keep-alive 3 The waiting time for a storage operation (in seconds)
Write-timeout 3 The waiting time for a write operation (in seconds)
Checkpoint-timeout 600 expert: the waiting time for a checkpoint operation (in seconds)

Memory-channel

For memory-channel implementation, the entire transaction is completed in memory. The memory-channel is unstable because it stores all events in memory. If the process dies, all the events stored in memory are lost. In addition, the size of memory space is limited by the RAM size. The strength of the file-channel is that it can store all the event data in the disk if sufficient disk space is available.

Memory-channel parameters are set as follows:

Parameter name Default value Description
Type - Channel type name
Capacity 100 The maximum storage capacity of the channel
Transactioncapacity 100 The maximum number of event that can be taken from a source or sent in each transaction
Keep-alive 3 The time-out value(in seconds) for adding or deleting an event
Bytecapacitybufferpercentage 20 Definition of the cache percentage
Bytecapacity See description Total bytes allowed for all events with the maximum memory size

The time is stored in the memory queue. It is applicable to a situation where high performance is needed, and data loss in case of agent failure is acceptable.

  • Capacity: The maximum number of event stored in the channel is 100 by default.

  • TrasactionCapacity: The maximum number of event that can be taken from a source or sent to a sink each time is 100.

  • Keep-alive: The maximum time allowed to add or remove an event in the channel.

  • Byte: Limit on the number of bytes of the event with only Eventbody included.

The common commands are as follows:

  1. a1.channels = c1
  2. a1.channels.c1.type = memory
  3. a1.channels.c1.capacity = 10000
  4. a1.channels.c1.transactionCapacity = 10000
  5. a1.channels.c1.byteCapacityBufferPercentage = 20
  6. a1.channels.c1.byteCapacity = 800000

Source parameter configuration

TailDirSource configuration

The source comes with a TailSource and a TailDirSource. The TailDirSource is used to read a file and send it to the sink line by line. The TailSource supports the configuration of four parameters including WaitTime, File, StartFromEnd, and Offset.

  • WaitTime: The interval between file readings. The program reads and stores the file content block by block to the buffer, and then parses it line by line. It is not required and defaults to once per second.

  • File: (Required). The absolute file path to be read.

  • StartFromEnd: Whether to read from the end of the file. It defaults to false.

  • Offset: Reading from the offset. It is not required and defaults to 0.

Parameter name Default value Description
Channels - -
Type - Component type name.
Filegroups - A list of space-separated file groups (a set of files tracked by each file group).
Filegroups. - The file group in the absolute path. The regular expressions (not the file system mode) can only be used for file names.
PositionFile ~/.flume/taildir_position.json Recording files in JSON
Headers.. - The header value, a collection of header keys. You can specify multiple headers for one file group.
SkipToEnd False Whether to skip the end when a position file is written at the end.
IdleTimeout 120000 The time-out value for the source to automatically reopen a closed file when a new line is added.
BatchSize 100 The maximum number of rows read and sent to the channel each time.
FileHeader False Whether to add a header file that stores the absolute path file name.
FileHeaderKey File Add the file header to an absolute path file name.

Parameter description

Parameter Description Required or not Default value
sources All the log collection sources including Web pages, log files, databases, and ports. Yes None
channels Serves as a temporary storage area for transferring the local file, Redis, Kakfa, database, and memory. Yes None
Type Type of channel name. Yes None
Bytecapacity Total bytes allowed for all events under the maximum memory. Yes None
Capacity Maximum channel capacity. Yes None
transactionCapacity Maximum transaction capacity. Yes None

Development in wizard mode

Currently, development in wizard mode is unavailable.

Development in script mode

Configure a log streaming job using the file-channel:

  1. {
  2. "type": "stream",
  3. "traceId": "taildir to datahub stream test5",
  4. "version": "1.0",
  5. "configuration": {
  6. "setting": {
  7. "speed": {
  8. "channel": 100
  9. },
  10. "errorLimit": {
  11. "record": 0
  12. },
  13. "channel": {
  14. "agent.channels": "file",//File channel
  15. "agent.channels.file.checkpointDir": "",//The directory where checkpoint files are stored
  16. "agent.channels.file.dataDirs": "",//The directory where the data is stored
  17. "agent.channels.file.capacity": "100000",//Maximum channel capacity
  18. "agent.channels.file.transactionCapacity": "100000",//Maximum transaction capacity
  19. "agent.channels.file.byteCapacity": "2000000",//Maximum channel capacity
  20. "agent.channels.file.type": "file"//Type name
  21. }
  22. },
  23. "reader": {
  24. "plugin": "flume",
  25. "parameter": {
  26. "agent.sources": "taildir",//Source name
  27. "agent.sources.taildir.type": "TAILDIR",//Type name
  28. "agent.sources.taildir.channels": "file",//File-channel corresponding to preceding channels
  29. "agent.sources.taildir.filegroups": "f1",//File group name generated in the directory
  30. "agent.sources.taildir.filegroups.f1": "/home/f1/x.log",//Path of the file in the file group generated in the directory
  31. "agent.sources.taildir.headers.f1.headerKey1": "value1",//The title used for adding the absolute path file name to an event
  32. "agent.sources.taildir.fileHeader": "true"//Whether to add a header file that stores the absolute path file name
  33. }
  34. },
  35. "writer": {}

Configure a log streaming job using the memory-channel:

  1. {
  2. "configuration": {
  3. "reader": {
  4. "parameter": {
  5. "agent.sources": "taildir",//Source name
  6. "agent.channels": "memoryChannel",//Memory-channel of corresponding channels
  7. "agent.sources.taildir.fileHeader": "true",//Whether to add a header file that stores the absolute path file name
  8. "agent.sources.taildir.filegroups": "f1",//File group name generated in the directory
  9. "agent.sources.taildir.filegroups.f1": "/home/lzz/x.log",//Path of the file in the file group generated in the directory
  10. "agent.sources.taildir.headers.f1.headerKey1": "value1",//The title used for adding the absolute path file name to an event
  11. "agent.sources.taildir.type": "TAILDIR"//Type name
  12. },
  13. "plugin": "flume"
  14. },
  15. "setting": {
  16. "channel": {
  17. "agent.channels": "memoryChannel",//Memory-channel
  18. "agent.channels.memoryChannel.byteCapacity": "800000",//Maximum channel capacity
  19. "agent.channels.memoryChannel.capacity": "10000",//Maximum channel capacity
  20. "agent.channels.memoryChannel.transactionCapacity": "10000",//Maximum transaction capacity
  21. "agent.channels.memoryChannel.type": "memory"//Type name
  22. },
  23. "errorLimit": {
  24. "record": 0
  25. },
  26. "speed": {
  27. "byte": 1048576
  28. }
  29. },
  30. "writer": {}

Tip: The Real-time Log feature is still in beta testing. Open a ticket to our Data Integration team for trial use.

Thank you! We've received your feedback.