All Products
Search
Document Center

Object Storage Service:use flume to synchronize data from an emr kafka cluster to oss-hdfs

Last Updated:Aug 14, 2025

This topic describes how to use Flume to synchronize data from an E-MapReduce (EMR) Kafka cluster to the Alibaba Cloud OSS-HDFS service.

Prerequisites

  • The OSS-HDFS service is enabled and authorized. For more information, see Enable the OSS-HDFS service.

  • You have created a DataLake cluster and selected the Flume service. For more information, see Create a cluster.

  • You have created a DataFlow cluster and selected the Kafka service. For more information, see Create a cluster.

Procedure

  1. Configure Flume.

    1. Go to the Flume configuration page.

      1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.

      2. In the top menu bar, select a region and resource group as needed.

      3. On the EMR on ECS page, click Cluster Service in the Actions column for the target cluster.

      4. On the Cluster Service tab, click Configure in the FLUME service section.

    2. Set the maximum available memory (Xmx) for the JVM.

      Flume consumes a large amount of Java Virtual Machine (JVM) memory when it writes data to OSS-HDFS. You can increase the Xmx value for the Flume agent by performing the following steps:

      1. Click the flume-env.sh tab.

        This topic describes the global configuration method. If you want to configure by node, select Independent Node Configuration from the drop-down list on the Configuration page of the FLUME service.

      2. Modify the value of the JAVA_OPTS parameter.

        For example, to set the maximum available JVM memory to 1 GB, change the parameter value to -Xmx1g.

      3. Click Save.

    3. Modify the flume-conf.properties configuration.

      1. Click the flume-conf.properties tab.

        This topic describes the global configuration method. If you want to configure by node, select Independent Node Configuration from the drop-down list on the Configuration page of the FLUME service.

      2. In the editor for flume-conf.properties, add the following configuration items.

        Note

        The value of default-agent in the following example must be the same as the value of the agent_name parameter on the Configuration page of the FLUME service.

        default-agent.sources = source1
        default-agent.sinks = k1
        default-agent.channels = c1
        
        default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
        default-agent.sources.source1.channels = c1
        default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
        default-agent.sources.source1.kafka.topics = flume-test
        default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
        
        default-agent.sinks.k1.type = hdfs
        default-agent.sinks.k1.hdfs.path = oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}
        default-agent.sinks.k1.hdfs.fileType=DataStream
        
        # Use a channel which buffers events in memory
        default-agent.channels.c1.type = memory
        default-agent.channels.c1.capacity = 100
        default-agent.channels.c1.transactionCapacity = 100
        
        # Bind the source and sink to the channel
        default-agent.sources.source1.channels = c1
        default-agent.sinks.k1.channel = c1

        Parameter

        Description

        default-agent.sources.source1.kafka.bootstrap.servers

        The hostnames and port numbers of the brokers in the Kafka cluster.

        default-agent.sinks.k1.hdfs.path

        The path of OSS-HDFS. The format is oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}. An example value is oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result.

        The parameters are described as follows:

        • {yourBucketName}: The name of the bucket for which the OSS-HDFS service is enabled.

        • {yourBucketRegion}: The region ID where the bucket is located.

        • {path}: The directory name for the OSS-HDFS service.

        default-agent.channels.c1.capacity

        The maximum number of events stored in the channel. Modify this parameter value based on your environment.

        default-agent.channels.c1.transactionCapacity

        The maximum number of events that each transaction channel receives from the source or provides to the sink. Modify this parameter value based on your environment.

      3. Click Save.

  2. Test the data synchronization.

    1. Use Secure Shell (SSH) to log on to the Dataflow cluster. For more information, see Log on to a cluster.
    2. Run the following command to create a topic named flume-test:
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. Generate test data.

      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      For example, enter abc and press Enter.

      A file named FlumeData.xxxx is generated in the oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result path. In the filename, xxxx represents the timestamp in milliseconds when the file was generated.