All Products
Search
Document Center

Object Storage Service:Use Flume to synchronize data from an EMR Kafka cluster to a bucket with OSS-HDFS enabled

Last Updated:Sep 21, 2023

This topic describes how to use Flume to synchronize data from an EMR Kafka cluster to an Object Storage Service (OSS) bucket for which OSS-HDFS is enabled.

Prerequisites

Procedure

  1. Configure Flume.

    1. Go to the Flume configuration page.

      1. Go to the EMR on ECS page of the EMR console.

      2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.

      3. On the EMR on ECS page, find the desired cluster and click Services in the Actions column.

      4. On the Services tab, click Configure in the Flume section.

    2. Set the maximum memory available to Java Virtual Machine (JVM).

      A large amount of JVM memory is consumed when data is written from Flume to OSS-HDFS. We recommend that you increase the value of the Xmx option for the Flume agent. To increase the maximum JVM memory, perform the following steps:

      1. Click the flume-env.sh tab.

        This topic uses global configuration. If you want to apply the configuration only to a specific node, select Independent Node Configuration from the second drop-down list on the Configure tab of the FLUME service.

      2. Change the value of JAVA_OPTS.

        For example, if you want to set the maximum memory available to JVM to 1 GB, set the value to -Xmx1g.

      3. Click Save.

    3. Modify flume-conf.properties.

      1. On the Configure tab, click the flume-conf.properties tab.

        This topic uses global configuration. If you want to apply the configuration only to a specific node, select Independent Node Configuration from the second drop-down list on the Configure tab of the FLUME service.

      2. In the editor next to flume-conf.properties, enter the following configuration items.

        Note

        The value of the default-agent in the following configuration must be the same as the value of the agent_name parameter on the Configure tab of the FLUME service.

        default-agent.sources = source1
        default-agent.sinks = k1
        default-agent.channels = c1
        
        default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
        default-agent.sources.source1.channels = c1
        default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
        default-agent.sources.source1.kafka.topics = flume-test
        default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
        
        default-agent.sinks.k1.type = hdfs
        default-agent.sinks.k1.hdfs.path = oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>
        default-agent.sinks.k1.hdfs.fileType=DataStream
        
        # Use a channel which buffers events in memory
        default-agent.channels.c1.type = memory
        default-agent.channels.c1.capacity = 100
        default-agent.channels.c1.transactionCapacity = 100
        
        # Bind the source and sink to the channel
        default-agent.sources.source1.channels = c1
        default-agent.sinks.k1.channel = c1

        Parameter

        Description

        default-agent.sources.source1.kafka.bootstrap.servers

        The hostnames and port numbers of brokers in the Kafka cluster.

        default-agent.sinks.k1.hdfs.path

        The path that is used to access OSS-HDFS. The path format is oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>. An example path is oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result.

        Path components:

        • <examplebucket>: the name of the bucket for which the OSS-HDFS service is enabled.

        • <exampleregion>: the ID of the region where the bucket is located.

        • <exampledir>: the name of the directory for the OSS-HDFS service.

        default-agent.channels.c1.capacity

        The maximum number of events that are stored in the channel. Modify this parameter based on your business requirements.

        default-agent.channels.c1.transactionCapacity

        The maximum number of events that each transaction channel receives from the source or provides for the receiver. Modify this parameter based on your business requirements.

      3. Click Save.

  2. Check the result of data synchronization.

    1. Use Secure Shell (SSH) to log on to the Dataflow cluster. For more information, see Log on to a cluster.
    2. Run the following command to create a topic named flume-test:
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. Generate test data.

      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      For example, enter abc and press Enter.

      A FlumeData.xxxx file is generated in the oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result, where xxxx is the UNIX timestamp in milliseconds of file generation.