This topic describes how to synchronize data from an E-MapReduce (EMR) Dataflow cluster to the HDFS service of an EMR data lake cluster.

Prerequisites

  • An EMR data lake cluster is created, and Flume is selected from the optional services during cluster creation. For more information, see Create a cluster.
  • An EMR Dataflow cluster is created and the Kafka service is selected from the optional services during cluster creation. For more information, see Create a cluster.

Procedure

  1. Configure the Flume service.
    1. Go to the Configure tab of the Flume service.
      1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
      2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
      3. On the EMR on ECS page, find the cluster that you want to manage and click Services in the Actions column.
      4. On the Services tab, click Configure in the Flume service section.
    2. On the Configure tab, click the flume-conf.properties subtab.
      In this example, the global configuration is used. If you want to configure the cluster by node, you can select Independent Node Configuration from the drop-down list on the Configure subtab of the Flume service.
    3. Add the following content to the value of the flume-conf.properties parameter:
      Note The value of the default-agent parameter in the sample code must be the same as that of the agent_name parameter on the Configure tab of the Flume service.
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      # Describe the sink
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = hdfs://emr-cluster/tmp/flume/test-data
      default-agent.sinks.k1.hdfs.fileType=DataStream
      
      # Use a channel which buffers events in memory
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 100
      default-agent.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      Parameter Description
      default-agent.sources.source1.kafka.bootstrap.servers The servers and port numbers of brokers in the Dataflow cluster.
      default-agent.sources.source1.kafka.topics The topics from which Flume consumes Kafka data.
      default-agent.channels.c1.capacity The maximum number of events that are stored in the channel. Modify the value of this parameter based on your business requirements.
      default-agent.channels.c1.transactionCapacity The maximum number of events that each transaction channel receives from the source or provides for the receiver. Modify the value of this parameter based on your business requirements.
      default-agent.sinks.k1.hdfs.path The path in which Flume writes data to HDFS.
      The following sample code is provided for common and high-availability clusters:
      • High-availability cluster
        default-agent.sinks.k1.hdfs.path = hdfs://emr-cluster/tmp/flume/test-data
      • Common cluster
        default-agent.sinks.k1.hdfs.path = hdfs://master-1-1:9000/tmp/flume/test-data
    4. Save the configurations.
      1. Click Save in the lower-left corner.
      2. In the dialog box that appears, enter an execution reason and click Save.
  2. Start the Flume service.
    1. On the Status tab of the Flume service, find the FlumeAgent component and choose More > Restart in the Actions column.
    2. In the dialog box that appears, enter an execution reason and click OK.
    3. In the Confirm message, click OK.
  3. Test data synchronization.
    1. Use Secure Shell (SSH) to log on to the Dataflow cluster. For more information, see Log on to a cluster.
    2. Run the following command to create a topic named flume-test:
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. Generate test data.
      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      For example, enter abc and press the Enter key.

    4. Connect to the data lake cluster by using SSH and view the generated file.
      Flume generates a file named FlumeData.xxxx in HDFS. xxxx indicates the current timestamp in milliseconds. The file content is the test data abc.
      hdfs dfs -cat /tmp/flume/test-data/<FlumeData.xxxx>