All Products
Search
Document Center

E-MapReduce:Sync HDFS audit logs to HDFS

Last Updated:Mar 26, 2026

Use E-MapReduce (EMR) Apache Flume to stream Hadoop Distributed File System (HDFS) audit logs to HDFS in real time. This guide walks you through configuring Flume agents on each node type and starting them from the EMR console.

How it works

The pipeline uses three Flume agents running across your cluster:

  • Master node (master-1-1): A taildir source reads the HDFS audit log file (/mnt/disk1/log/hadoop-hdfs/hdfs-audit.log.*), batches events, and forwards them over Avro RPC to both core nodes.

  • Core nodes (core-1-1, core-1-2): An Avro source receives events from the master node. An HDFS sink writes the events to HDFS. The two core nodes act as a failover pair—the master node's sink group prioritizes core-1-1 and fails over to core-1-2.

master-1-1
  taildir source  ──avro──►  core-1-1  (HDFS sink, priority 10)
                  ──avro──►  core-1-2  (HDFS sink, priority 5, failover)

Prerequisites

Before you begin, ensure that you have:

  • An EMR data lake cluster

  • Flume selected as an optional service when the cluster was created (see Create a cluster)

Configure and start Flume agents

Step 1: Go to the Services tab

  1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.

  2. In the top navigation bar, select the region where your cluster resides and select a resource group.

  3. On the EMR on ECS page, click Services in the Actions column of your cluster.

Step 2: Open the Flume configuration

On the Services tab, click Configure in the Flume service section.

Step 3: Configure the core-1-1 agent

  1. On the Configure tab, click the flume-conf.properties subtab.

  2. From the drop-down lists, select Independent Node Configuration and core-1-1.

  3. Replace the contents of flume-conf.properties with the following configuration:

    # Agent topology
    default-agent.sinks = default-sink
    default-agent.sources = default-source
    default-agent.channels = default-channel
    
    # Source: receive events from the master node over Avro RPC
    default-agent.sources.default-source.type = avro
    default-agent.sources.default-source.bind = 0.0.0.0
    default-agent.sources.default-source.port = <avro-source-port>
    default-agent.sources.default-source.channels = default-channel
    
    # Channel: buffer events to disk for durability
    default-agent.channels.default-channel.type = file
    default-agent.channels.default-channel.transactionCapacity = 10000
    default-agent.channels.default-channel.capacity = 1000000
    default-agent.channels.default-channel.dataDirs = <data-dir>
    default-agent.channels.default-channel.checkpointDir = <checkpoint-dir>
    
    # Sink: write events to HDFS, rolling files once per day
    default-agent.sinks.default-sink.type = hdfs
    default-agent.sinks.default-sink.channel = default-channel
    default-agent.sinks.default-sink.hdfs.path = hdfs://master-1-1:9000/<output-path>
    default-agent.sinks.default-sink.hdfs.fileType = DataStream
    default-agent.sinks.default-sink.hdfs.rollInterval = 86400
    default-agent.sinks.default-sink.hdfs.rollSize = 0
    default-agent.sinks.default-sink.hdfs.rollCount = 0
    default-agent.sinks.default-sink.hdfs.batchSize = 51200

    Replace the following placeholders with your actual values:

    Placeholder Description Example
    <avro-source-port> Port the Avro source listens on 44444
    <data-dir> Directory where channel events are buffered ~/.flume/file-channel/data
    <checkpoint-dir> Directory where channel checkpoints are stored ~/.flume/file-channel/checkpoint
    <output-path> HDFS path where audit logs are written /audit-logs/hdfs

    For a high availability (HA) cluster, use hdfs://emr-cluster/<output-path> instead of hdfs://master-1-1:9000/<output-path>.

    Key parameters for the HDFS sink:

    Parameter Default Value in this configuration Description
    hdfs.rollInterval 30 s 86400 Seconds between file rolls. Set to 86400 to roll once per day.
    hdfs.rollSize 1024 bytes 0 File size (bytes) that triggers a roll. 0 = never roll based on file size.
    hdfs.rollCount 10 0 Number of events that triggers a roll. 0 = never roll based on event count.
    hdfs.batchSize 100 51200 Events written to the file before flushing to HDFS.
    hdfs.fileType SequenceFile DataStream Output file format. Use DataStream for plain-text audit logs.
    transactionCapacity 10000 Maximum events the channel passes in a single transaction.
    capacity 1000000 Maximum events the channel can hold. Set this high enough to buffer traffic between file rolls.

    For a full list of Flume parameters, see Apache Flume User Guide.

  4. Click Save in the lower-left corner. In the dialog box, enter an execution reason and click Save.

Step 4: Configure the core-1-2 agent

Repeat Step 3, selecting core-1-2 from the node drop-down list. Use the same configuration, but set <avro-source-port> to the port you will reference for core-1-2 in the master node's sink configuration.

Step 5: Configure the master-1-1 agent

  1. On the Configure tab, select Independent Node Configuration and master-1-1 from the drop-down lists.

  2. Replace the contents of flume-conf.properties with the following configuration:

    # Agent topology: one source, one channel, two sinks in a failover group
    default-agent.sinks = default-sink k1
    default-agent.sources = default-source
    default-agent.channels = default-channel
    default-agent.sinkgroups = g1
    
    # Source: tail the HDFS audit log file
    default-agent.sources.default-source.type = taildir
    default-agent.sources.default-source.filegroups = f1
    default-agent.sources.default-source.filegroups.f1 = /mnt/disk1/log/hadoop-hdfs/hdfs-audit.log.*
    default-agent.sources.default-source.positionFile = ~/.flume/taildir_position.json
    default-agent.sources.default-source.batchSize = 2000
    default-agent.sources.default-source.ignoreRenameWhenMultiMatching = true
    default-agent.sources.default-source.channels = default-channel
    
    # Channel: buffer events to disk for durability
    default-agent.channels.default-channel.type = file
    default-agent.channels.default-channel.transactionCapacity = 2000
    default-agent.channels.default-channel.capacity = <capacity>
    default-agent.channels.default-channel.dataDirs = <data-dir>
    default-agent.channels.default-channel.checkpointDir = <checkpoint-dir>
    
    # Sink group: failover between core-1-1 (priority 10) and core-1-2 (priority 5)
    default-agent.sinkgroups.g1.sinks = default-sink k1
    default-agent.sinkgroups.g1.processor.type = failover
    default-agent.sinkgroups.g1.processor.priority.default-sink = 10
    default-agent.sinkgroups.g1.processor.priority.k1 = 5
    
    # Sink: forward to core-1-1 over Avro RPC
    default-agent.sinks.default-sink.type = avro
    default-agent.sinks.default-sink.channel = default-channel
    default-agent.sinks.default-sink.hostname = <core-1-1-ip>
    default-agent.sinks.default-sink.port = <core-1-1-port>
    default-agent.sinks.default-sink.batch-size = 2000
    
    # Sink: forward to core-1-2 over Avro RPC (failover target)
    default-agent.sinks.k1.type = avro
    default-agent.sinks.k1.channel = default-channel
    default-agent.sinks.k1.hostname = <core-1-2-ip>
    default-agent.sinks.k1.port = <core-1-2-port>
    default-agent.sinks.k1.batch-size = 2000

    Replace the following placeholders:

    Placeholder Description
    <capacity> Maximum events the channel can hold. Scale based on your rollInterval and event rate.
    <data-dir> Directory where channel events are buffered
    <checkpoint-dir> Directory where channel checkpoints are stored
    <core-1-1-ip> IP address of the core-1-1 node
    <core-1-1-port> Avro source port configured on core-1-1
    <core-1-2-ip> IP address of the core-1-2 node
    <core-1-2-port> Avro source port configured on core-1-2

    Key parameters for the master node:

    Parameter Default Value in this configuration Description
    sources.default-source.type taildir Tails one or more files and tracks read positions across restarts.
    sources.default-source.batchSize 100 2000 Maximum events read from the log file in a single pass.
    sources.default-source.ignoreRenameWhenMultiMatching false true Prevents duplicate events when log4j rotates files matched by the wildcard pattern.
    channels.default-channel.transactionCapacity 2000 Maximum events the channel takes from the source or pushes to a sink per transaction. Must be >= batchSize.
    sinkgroups.g1.processor.type default failover Routes all traffic to the highest-priority sink; switches to the next priority on failure. Valid values: default, failover, load_balance.
    sinkgroups.g1.processor.priority.default-sink 10 Priority weight for core-1-1. Higher value = higher priority.
    sinkgroups.g1.processor.priority.k1 5 Priority weight for core-1-2. Used when core-1-1 is unavailable.
  3. Click Save in the lower-left corner. In the dialog box, enter an execution reason and click Save.

Step 6: Start the Flume agents

  1. In the upper-right corner of the Flume service page, choose More > Restart.

  2. In the dialog box, set the Execution Reason parameter and click OK.

  3. In the confirmation dialog, click OK.

Audit logs are now synchronized to HDFS. Flume agent logs are stored at /var/log/emr/flume/default-agent/flume.log.