E-MapReduce (EMR) Flume can be started in multiple ways. This topic describes how to modify Flume configurations and start Flume agents in the EMR console to synchronize audit logs to HDFS in real time.

Prerequisites

An EMR data lake cluster is created, and Flume is selected from the optional services during cluster creation. For more information about how to create a cluster, see Create a cluster.

Procedure

  1. Go to the Services tab.
    1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
    2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
    3. On the EMR on ECS page, click Services in the Actions column of the cluster that you want to manage.
  2. On the Services tab, click Configure in the Flume service section.
  3. Configure the Flume agent of the core-1-1 node and save the configuration.
    Set the parameters based on the configurations in open source Flume. For more information, see Apache Flume.
    1. On the Configure tab, click the flume-conf.properties subtab.
    2. Select Independent Node Configuration and core-1-1 from the drop-down lists.
    3. Modify the values of the parameters of flume-conf.properties as needed.
      default-agent.sinks = default-sink
      default-agent.sources = default-source
      default-agent.channels = default-channel
      default-agent.sinks.default-sink.type = hdfs
      default-agent.sinks.default-sink.channel =  default-channel
      default-agent.channels.default-channel.type = file
      default-agent.sources.default-source.type = avro
      default-agent.sinks.default-sink.hdfs.path = hdfs://master-1-1:9000/path
      default-agent.sinks.default-sink.hdfs.fileType = DataStream
      default-agent.sinks.default-sink.hdfs.rollSize = 0
      default-agent.sinks.default-sink.hdfs.rollCount = 0
      default-agent.sinks.default-sink.hdfs.rollInterval = 86400
      default-agent.sinks.default-sink.hdfs.batchSize = 51200
      default-agent.sources.default-source.bind = 0.0.0.0
      default-agent.sources.default-source.port = ****
      default-agent.sources.default-source.channels =  default-channel
      default-agent.channels.default-channel.transactionCapacity = 10000
      default-agent.channels.default-channel.dataDirs = ****
      default-agent.channels.default-channel.checkpointDir = ****
      default-agent.channels.default-channel.capacity = 1000000
      Parameter Description
      default-agent.sinks Specifies the names of all sinks. Example: default-sink.
      default-agent.sources Specifies the names of all sources. Example: default-source.
      default-agent.channels Specifies the names of all channels. Example: default-channel.
      default-agent.sinks.default-sink.hdfs.path The HDFS path.
      • Example for a high-availability (HA) cluster: hdfs://emr-cluster/path
      • Example for a non-HA cluster: hdfs://master-1-1:9000/path
      default-agent.sinks.default-sink.hdfs.fileType Set the value to DataStream.
      default-agent.sinks.default-sink.hdfs.rollSize Specifies the file size to trigger a roll. When the size of temporary files reaches the value of this parameter, a new file is generated based on a roll. Unit: byte.

      If you set this parameter to 0, the temporary files are not rolled based on the file size.

      default-agent.sinks.default-sink.hdfs.rollCount Specifies the number of events to trigger a roll. When the number of events reaches the value of this parameter, the temporary files are rolled to generate a new file.

      If you set this parameter to 0, the temporary files are not rolled based on the number of events.

      default-agent.sinks.default-sink.hdfs.rollInterval Specifies the interval to generate a new file. Unit: seconds. Example: 86400.
      default-agent.sinks.default-sink.hdfs.batchSize Specifies the number of events written to a file before it is flushed to HDFS. Example: 51200.
      default-agent.sinks.default-sink.channel Specifies the name of the channel of default-sink.
      default-agent.sources.default-source.bind The associated IP address. If you set this parameter to 0.0.0.0, all IP addresses of the machine are associated.
      default-agent.sources.default-source.port The listening port. Set this parameter as required.
      default-agent.sources.default-source.channels The name of the channel of default-source.
      default-agent.channels.default-channel.transactionCapacity The maximum number of events that a channel passes in a transaction. Default value: 10000.
      default-agent.channels.default-channel.dataDirs Optional. The path where events are stored.

      Default value: ~/.flume/file-channel/data.

      default-agent.channels.default-channel.checkpointDir Optional. The path where the checkpoint file is stored.

      Default value: ~/.flume/file-channel/checkpoint.

      default-agent.channels.default-channel.capacity Optional. Set this parameter based on the settings of the preceding HDFS rolling parameters.

      Default value: 1000000.

    4. Save the configurations.
      1. Click Save in the lower-left corner.
      2. In the dialog box that appears, enter an execution reason and click Save.
  4. Configure the Flume agent of the core-1-2 node and save the configuration by referring to Step 3.
  5. Configure the Flume Agent of the master node group and save the configuration.
    Set the parameters based on the configurations in open source Flume. For more information, see Apache Flume.
    1. On the Configure tab, select Independent Node Configuration and master-1-1 from the drop-down lists.
    2. Modify the values of the parameters of flume-conf.properties as needed.
      default-agent.sinks = default-sink k1
      default-agent.sources = default-source
      default-agent.channels = default-channel
      default-agent.sources.default-source.type = taildir
      default-agent.sinks.default-sink.type = avro
      default-agent.sinks.default-sink.channel =  default-channel
      default-agent.channels.default-channel.type = file
      default-agent.sources.default-source.filegroups = f1
      default-agent.sources.default-source.filegroups.f1 = /mnt/disk1/log/hadoop-hdfs/hdfs-audit.log.*
      default-agent.sources.default-source.positionFile = ~/.flume/taildir_position.json
      default-agent.sources.default-source.channels =  default-channel
      default-agent.sources.default-source.batchSize = 2000
      default-agent.sources.default-source.ignoreRenameWhenMultiMatching = true
      default-agent.channels.default-channel.checkpointDir = ****
      default-agent.channels.default-channel.dataDirs = ****
      default-agent.channels.default-channel.capacity = ****
      default-agent.channels.default-channel.transactionCapacity = 2000
      default-agent.sinkgroups = g1
      default-agent.sinkgroups.g1.sinks = default-sink k1
      default-agent.sinkgroups.g1.processor.type = failover
      default-agent.sinkgroups.g1.processor.priority.default-sink = 10
      default-agent.sinkgroups.g1.processor.priority.k1 = 5
      default-agent.sinks.default-sink.hostname = ****
      default-agent.sinks.default-sink.port = ****
      default-agent.sinks.k1.hostname = ****
      default-agent.sinks.k1.port = ****
      default-agent.sinks.default-sink.batch-size = 2000
      default-agent.sinks.k1.batch-size = 2000
      default-agent.sinks.k1.type = avro
      default-agent.sinks.k1.channel = default-channel
      Parameter Description
      default-agent.sinks Specifies the names of all sinks. Example: default-sink k1.
      default-agent.sources Specifies the names of all sources. Example: default-source.
      default-agent.channels Specifies the names of all channels. Example: default-channel.
      default-agent.sources.default-source.filegroups.f1 The path of generated logs. Default value: /mnt/disk1/log/hadoop-hdfs/hdfs-audit.log.*.
      default-agent.sources.default-source.positionFile Optional. The path where the position file is stored.

      Default value: ~/.flume/taildir_position.json.

      default-agent.channels.default-channel.checkpointDir Optional. The path where the checkpoint file is stored.
      default-agent.channels.default-channel.dataDirs The path where events are stored.
      default-agent.channels.default-channel.capacity Optional. Set this parameter based on the settings of the preceding HDFS rolling parameters.
      default-agent.sources.default-source.batchSize The maximum number of events written to a channel at a time. Example: 2000.
      default-agent.channels.default-channel.transactionCapacity The maximum number of events that each channel takes from a source or pushes to a sink. Example: 2000.
      default-agent.sources.default-source.ignoreRenameWhenMultiMatching When a Flume taildir source uses wildcards to match the rotated log4j log files in file groups, data duplication occurs. You can set this parameter to true to prevent this issue.
      default-agent.sinkgroups Specifies the names of all sink groups. Example: g1.
      default-agent.sinkgroups.g1.sinks Specifies the names of all sinks in the sink group g1. Example: default-sink k1.
      default-agent.sinkgroups.g1.processor.type Specifies the processing logic type of the sink group g1. Valid values:
      • default: Only one sink is used.
      • failover: Failover is allowed.
      • load_balance: Load balancing is allowed.
      default-agent.sinkgroups.g1.processor.priority.default-sink Specifies the weight of the sink named default-sink in the sink group g1. If you set the parameter to a higher value, default-sink is prioritized. For example, you can set this parameter to 10.
      default-agent.sinkgroups.g1.processor.priority.k1 Specifies the weight of the k1 sink in the sink group g1. If you set the parameter to a higher value, the k1 sink is prioritized. For example, you can set this parameter to 5.
      default-agent.sinks.default-sink.hostname The IP address of the core-1-1 node.
      default-agent.sinks.default-sink.port The port number of the Flume agent on the core-1-1 node.
      default-agent.sinks.k1.hostname The IP address of the core-1-2 node.
      default-agent.sinks.k1.port The port number of the Flume agent on the core-1-2 node.
      default-agent.sinks.default-sink.batch-size The number of events sent by default-sink at a time. Example: 2000.
      default-agent.sinks.k1.batch-size The number of events sent by the k1 sink at a time. Example: 2000.
      default-agent.sinks.k1.type Specifies the sink type. Example: avro.
      default-agent.sinks.k1.channel Configures a channel for a sink. Example: default-channel.
    3. Save the configurations.
      1. Click Save in the lower-left corner.
      2. In the dialog box that appears, enter an execution reason and click Save.
  6. Start the Flume agents.
    1. In the upper-right corner, choose More > Restart.
    2. In the dialog box that appears, set the Execution Reason parameter and click OK.
    3. In the Confirm message, click OK.
      Audit logs are synchronized to HDFS.

      The logs of Flume agents are stored in the flume.log file in the /var/log/emr/flume/default-agent/flume.log directory.