Use E-MapReduce (EMR) Apache Flume to stream Hadoop Distributed File System (HDFS) audit logs to HDFS in real time. This guide walks you through configuring Flume agents on each node type and starting them from the EMR console.
How it works
The pipeline uses three Flume agents running across your cluster:
-
Master node (master-1-1): A taildir source reads the HDFS audit log file (
/mnt/disk1/log/hadoop-hdfs/hdfs-audit.log.*), batches events, and forwards them over Avro RPC to both core nodes. -
Core nodes (core-1-1, core-1-2): An Avro source receives events from the master node. An HDFS sink writes the events to HDFS. The two core nodes act as a failover pair—the master node's sink group prioritizes core-1-1 and fails over to core-1-2.
master-1-1
taildir source ──avro──► core-1-1 (HDFS sink, priority 10)
──avro──► core-1-2 (HDFS sink, priority 5, failover)
Prerequisites
Before you begin, ensure that you have:
-
An EMR data lake cluster
-
Flume selected as an optional service when the cluster was created (see Create a cluster)
Configure and start Flume agents
Step 1: Go to the Services tab
-
Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
-
In the top navigation bar, select the region where your cluster resides and select a resource group.
-
On the EMR on ECS page, click Services in the Actions column of your cluster.
Step 2: Open the Flume configuration
On the Services tab, click Configure in the Flume service section.
Step 3: Configure the core-1-1 agent
-
On the Configure tab, click the flume-conf.properties subtab.
-
From the drop-down lists, select Independent Node Configuration and core-1-1.
-
Replace the contents of
flume-conf.propertieswith the following configuration:# Agent topology default-agent.sinks = default-sink default-agent.sources = default-source default-agent.channels = default-channel # Source: receive events from the master node over Avro RPC default-agent.sources.default-source.type = avro default-agent.sources.default-source.bind = 0.0.0.0 default-agent.sources.default-source.port = <avro-source-port> default-agent.sources.default-source.channels = default-channel # Channel: buffer events to disk for durability default-agent.channels.default-channel.type = file default-agent.channels.default-channel.transactionCapacity = 10000 default-agent.channels.default-channel.capacity = 1000000 default-agent.channels.default-channel.dataDirs = <data-dir> default-agent.channels.default-channel.checkpointDir = <checkpoint-dir> # Sink: write events to HDFS, rolling files once per day default-agent.sinks.default-sink.type = hdfs default-agent.sinks.default-sink.channel = default-channel default-agent.sinks.default-sink.hdfs.path = hdfs://master-1-1:9000/<output-path> default-agent.sinks.default-sink.hdfs.fileType = DataStream default-agent.sinks.default-sink.hdfs.rollInterval = 86400 default-agent.sinks.default-sink.hdfs.rollSize = 0 default-agent.sinks.default-sink.hdfs.rollCount = 0 default-agent.sinks.default-sink.hdfs.batchSize = 51200Replace the following placeholders with your actual values:
Placeholder Description Example <avro-source-port>Port the Avro source listens on 44444<data-dir>Directory where channel events are buffered ~/.flume/file-channel/data<checkpoint-dir>Directory where channel checkpoints are stored ~/.flume/file-channel/checkpoint<output-path>HDFS path where audit logs are written /audit-logs/hdfsFor a high availability (HA) cluster, use
hdfs://emr-cluster/<output-path>instead ofhdfs://master-1-1:9000/<output-path>.Key parameters for the HDFS sink:
Parameter Default Value in this configuration Description hdfs.rollInterval30 s 86400Seconds between file rolls. Set to 86400 to roll once per day. hdfs.rollSize1024 bytes 0File size (bytes) that triggers a roll. 0= never roll based on file size.hdfs.rollCount10 0Number of events that triggers a roll. 0= never roll based on event count.hdfs.batchSize100 51200Events written to the file before flushing to HDFS. hdfs.fileTypeSequenceFile DataStreamOutput file format. Use DataStreamfor plain-text audit logs.transactionCapacity— 10000Maximum events the channel passes in a single transaction. capacity— 1000000Maximum events the channel can hold. Set this high enough to buffer traffic between file rolls. For a full list of Flume parameters, see Apache Flume User Guide.
-
Click Save in the lower-left corner. In the dialog box, enter an execution reason and click Save.
Step 4: Configure the core-1-2 agent
Repeat Step 3, selecting core-1-2 from the node drop-down list. Use the same configuration, but set <avro-source-port> to the port you will reference for core-1-2 in the master node's sink configuration.
Step 5: Configure the master-1-1 agent
-
On the Configure tab, select Independent Node Configuration and master-1-1 from the drop-down lists.
-
Replace the contents of
flume-conf.propertieswith the following configuration:# Agent topology: one source, one channel, two sinks in a failover group default-agent.sinks = default-sink k1 default-agent.sources = default-source default-agent.channels = default-channel default-agent.sinkgroups = g1 # Source: tail the HDFS audit log file default-agent.sources.default-source.type = taildir default-agent.sources.default-source.filegroups = f1 default-agent.sources.default-source.filegroups.f1 = /mnt/disk1/log/hadoop-hdfs/hdfs-audit.log.* default-agent.sources.default-source.positionFile = ~/.flume/taildir_position.json default-agent.sources.default-source.batchSize = 2000 default-agent.sources.default-source.ignoreRenameWhenMultiMatching = true default-agent.sources.default-source.channels = default-channel # Channel: buffer events to disk for durability default-agent.channels.default-channel.type = file default-agent.channels.default-channel.transactionCapacity = 2000 default-agent.channels.default-channel.capacity = <capacity> default-agent.channels.default-channel.dataDirs = <data-dir> default-agent.channels.default-channel.checkpointDir = <checkpoint-dir> # Sink group: failover between core-1-1 (priority 10) and core-1-2 (priority 5) default-agent.sinkgroups.g1.sinks = default-sink k1 default-agent.sinkgroups.g1.processor.type = failover default-agent.sinkgroups.g1.processor.priority.default-sink = 10 default-agent.sinkgroups.g1.processor.priority.k1 = 5 # Sink: forward to core-1-1 over Avro RPC default-agent.sinks.default-sink.type = avro default-agent.sinks.default-sink.channel = default-channel default-agent.sinks.default-sink.hostname = <core-1-1-ip> default-agent.sinks.default-sink.port = <core-1-1-port> default-agent.sinks.default-sink.batch-size = 2000 # Sink: forward to core-1-2 over Avro RPC (failover target) default-agent.sinks.k1.type = avro default-agent.sinks.k1.channel = default-channel default-agent.sinks.k1.hostname = <core-1-2-ip> default-agent.sinks.k1.port = <core-1-2-port> default-agent.sinks.k1.batch-size = 2000Replace the following placeholders:
Placeholder Description <capacity>Maximum events the channel can hold. Scale based on your rollIntervaland event rate.<data-dir>Directory where channel events are buffered <checkpoint-dir>Directory where channel checkpoints are stored <core-1-1-ip>IP address of the core-1-1 node <core-1-1-port>Avro source port configured on core-1-1 <core-1-2-ip>IP address of the core-1-2 node <core-1-2-port>Avro source port configured on core-1-2 Key parameters for the master node:
Parameter Default Value in this configuration Description sources.default-source.type— taildirTails one or more files and tracks read positions across restarts. sources.default-source.batchSize100 2000Maximum events read from the log file in a single pass. sources.default-source.ignoreRenameWhenMultiMatchingfalsetruePrevents duplicate events when log4j rotates files matched by the wildcard pattern. channels.default-channel.transactionCapacity— 2000Maximum events the channel takes from the source or pushes to a sink per transaction. Must be >= batchSize.sinkgroups.g1.processor.typedefaultfailoverRoutes all traffic to the highest-priority sink; switches to the next priority on failure. Valid values: default,failover,load_balance.sinkgroups.g1.processor.priority.default-sink— 10Priority weight for core-1-1. Higher value = higher priority. sinkgroups.g1.processor.priority.k1— 5Priority weight for core-1-2. Used when core-1-1 is unavailable. -
Click Save in the lower-left corner. In the dialog box, enter an execution reason and click Save.
Step 6: Start the Flume agents
-
In the upper-right corner of the Flume service page, choose More > Restart.
-
In the dialog box, set the Execution Reason parameter and click OK.
-
In the confirmation dialog, click OK.
Audit logs are now synchronized to HDFS. Flume agent logs are stored at /var/log/emr/flume/default-agent/flume.log.