This topic describes how to use the Flume service of E-MapReduce (EMR) to synchronize data from LogHub of Log Service to Hadoop Distributed File System (HDFS) of an EMR cluster in real time. The synchronized data is automatically stored in HDFS partitions based on timestamps.

Background information

You can use Logtail of Log Service to collect the data to be synchronized and upload the data to LogHub in real time. Then, use the Flume service of an EMR cluster to synchronize the data from LogHub to HDFS of the EMR cluster.

For more information about how to upload data to LogHub, see Data collection overview.

Prerequisites

An EMR data lake cluster is created, and Flume is selected from the optional services during cluster creation. For more information about how to create a cluster, see Create a cluster.

Procedure

  1. Configure the Flume service.
    1. Go to the Configure tab of the Flume service.
      1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
      2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
      3. On the EMR on ECS page, find the cluster that you want to manage and click Services in the Actions column.
      4. On the Services tab, click Configure in the Flume service section.
    2. On the Configure tab, click the flume-conf.properties subtab.
      In this example, the global configuration is used. If you want to configure the cluster by node, you can select Independent Node Configuration from the drop-down list on the Configure subtab of the Flume service.
    3. Add the following content to the value of the flume-conf.properties configuration item:
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.loghub.LogHubSource
      default-agent.sources.source1.endpoint = <yourLogHubEndpoint>
      default-agent.sources.source1.project = canaltest
      default-agent.sources.source1.logstore = canal
      default-agent.sources.source1.accessKeyId = yHiu*******BG2s
      default-agent.sources.source1.accessKey = ABctuw0M***************iKKljZy
      default-agent.sources.source1.useRecordTime = true
      default-agent.sources.source1.consumerGroup = consumer_1
      
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
      default-agent.sinks.k1.hdfs.fileType = DataStream
      default-agent.sinks.k1.hdfs.rollInterval = 3600
      default-agent.sinks.k1.hdfs.round = true
      default-agent.sinks.k1.hdfs.roundValue = 60
      default-agent.sinks.k1.hdfs.roundUnit = minute
      default-agent.sinks.k1.hdfs.rollSize = 0
      default-agent.sinks.k1.hdfs.rollCount = 0
      
      # Use a channel which buffers events in memory
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 2000
      default-agent.channels.c1.transactionCapacity = 2000
      
      # Bind the source and sink to the channel
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      Parameter Description
      default-agent.sources.source1.type Set the value to org.apache.flume.source.loghub.LogHubSource.
      default-agent.sources.source1.endpoint The endpoint that is used to access LogHub.
      Note If you use the virtual private cloud (VPC) or classic network endpoint of Log Service for a region, make sure that the EMR cluster resides in the same region. If you use a public endpoint, make sure that the node on which a Flume agent runs is assigned a public IP address.
      default-agent.sources.source1.project The name of the Log Service project.
      default-agent.sources.source1.logstore The name of the Logstore.
      default-agent.sources.source1.accessKeyId The AccessKey ID of your Alibaba Cloud account.
      default-agent.sources.source1.accessKey The AccessKey secret of your Alibaba Cloud account.
      default-agent.sources.source1.useRecordTime Set this parameter to true.

      Default value: false. If a header does not contain the timestamp property, the time when events are received is encoded as timestamps, and the timestamps are inserted into the header. When the Flume agent starts or stops or if data synchronization is delayed, the data is placed in wrong partitions. To prevent this issue, set this parameter to true. This way, the time when LogHub collects the data is used as timestamps.

      default-agent.sources.source1.consumerGroup The name of the consumer group. Default value: consumer_1.
      default-agent.sources.source1.consumerPosition The position where the consumer group consumes the LogHub data for the first time. Default value: end. A value of end specifies that the consumption starts from the latest data.
      • begin: The consumption starts from the earliest data.
      • special: The consumption starts from a specific point in time.

        If you set this parameter to special, you must set the startTime parameter to a specific point in time. Unit: seconds.

      The LogHub server records the consumption position of the consumer group after the first data consumption. To change the value of the consumerPosition parameter after the first data consumption, you can clear the status information of the consumer group or configure a new consumer group by changing the value of the consumerGroup parameter.
      default-agent.sources.source1.heartbeatInterval The interval at which the consumer group sends heartbeats to the server. Unit: milliseconds. Default value: 30000.
      default-agent.sources.source1.fetchInOrder Specifies whether the consumer group consumes data with the same key in sequence. Default value: false.
      default-agent.sources.source1.batchSize The maximum number of messages that can be written to a channel at a time. This is a common source batch configuration.
      default-agent.sources.source1.batchDurationMillis The maximum number of milliseconds to wait before messages are written to a channel at a time. This is a common source batch configuration.
      default-agent.sources.source1.backoffSleepIncrement The initial and incremental wait time that triggers sleep when LogHub does not have data. This is a common source sleep configuration.
      default-agent.sources.source1.maxBackoffSleep The maximum wait time that triggers sleep when LogHub does not have data. This is a common source sleep configuration.
      default-agent.sinks.k1.hdfs.path The storage path of HDFS. Example: /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H.
      default-agent.sinks.k1.hdfs.fileType The type of the file that is saved to HDFS. Set the value to DataStream.
      default-agent.sinks.k1.hdfs.rollInterval The interval at which a file is generated. Unit: seconds. Example: 3600.
      default-agent.sinks.k1.hdfs.round Specifies whether data in HDFS is partitioned by time. The timestamp is rounded down. Default value: true.
      default-agent.sinks.k1.hdfs.roundValue If the default-agent.sinks.k1.hdfs.round parameter is set to true, you must configure this parameter and the default-agent.sinks.k1.hdfs.roundUnit parameter.

      For example, if the default-agent.sinks.k1.hdfs.roundUnit parameter is set to minute and this parameter is set to 60, data that is generated within 60 minutes is written to a file, which means that a file is generated every 60 minutes.

      default-agent.sinks.k1.hdfs.roundUnit The time unit that is used to partition data. Default value: minute.
      default-agent.sinks.k1.hdfs.rollSize The file size to trigger a roll. If the size of temporary files reaches the value of this parameter, a new file is generated based on a roll. Unit: byte.

      A value of 0 specifies that the temporary files are not rolled based on the file size.

      default-agent.sinks.k1.hdfs.rollCount The number of events to trigger a roll. If the number of events reaches the value of this parameter, the temporary files are rolled to generate a new file.

      A value of 0 specifies that the temporary files are not rolled based on the number of events.

      default-agent.channels.c1.capacity The maximum number of events that are stored in the channel. Example: 2000.
      default-agent.channels.c1.transactionCapacity The maximum number of events that each channel takes from a source or pushes to a sink. Example: 2000.

      Configure the parameters of the Flume service based on open source Flume. For more information, see Avro Source, Taildir Source, HDFS Sink, and File Channel.

    4. Save the configurations.
      1. Click Save in the lower-left corner.
      2. In the dialog box that appears, enter an execution reason and click Save.
  2. Start the Flume service.
    1. On the Status tab of the Flume service, find the FlumeAgent component and choose More > Restart in the Actions column.
    2. In the dialog box that appears, enter an execution reason and click OK.
    3. In the Confirm message, click OK.
  3. Start the Flume service.
    1. On the Status tab of the Flume service, choose More > Restart.
    2. In the dialog box that appears, enter a reason in the Execution Reason field and click OK.
    3. In the Confirm message, click OK.
    After the Flume service is started, you can view the logs that are stored in the configured HDFS path based on timestamps.