All Products
Search
Document Center

Object Storage Service:Use Flume to synchronize data from an EMR Kafka cluster to a bucket with OSS-HDFS enabled

Last Updated:Mar 20, 2026

Stream data from an E-MapReduce (EMR) Kafka cluster to OSS-HDFS using Apache Flume. This guide shows you how to configure a Flume agent with a Kafka Source and an OSS-HDFS Sink, then verify that data flows end to end.

Prerequisites

Before you begin, ensure that you have:

Configure Flume

Go to the Flume configuration page

  1. Log on to the EMR console.

  2. In the left-side navigation pane, click EMR on ECS.

  3. In the top menu bar, select a region and resource group.

  4. On the EMR on ECS page, click Cluster Service in the Actions column for your cluster.

  5. On the Cluster Service tab, click Configure in the FLUME service section.

Set the maximum JVM heap size

Flume consumes significant Java Virtual Machine (JVM) memory when writing to OSS-HDFS. Increase the Xmx value to avoid out-of-memory errors.

  1. Click the flume-env.sh tab.

    The steps below describe the global configuration method. To configure by node instead, select Independent Node Configuration from the drop-down list on the Configuration page of the FLUME service.
  2. Update the JAVA_OPTS parameter. To set the maximum JVM heap to 1 GB, set the value to -Xmx1g.

  3. Click Save.

Configure flume-conf.properties

  1. Click the flume-conf.properties tab.

    The steps below describe the global configuration method. To configure by node instead, select Independent Node Configuration from the drop-down list on the Configuration page of the FLUME service.
  2. Add the following configuration to the editor.

    Important

    The agent name default-agent in the configuration must match the agent_name parameter value on the Configuration page of the FLUME service.

       # Agent topology
       default-agent.sources = source1
       default-agent.sinks = k1
       default-agent.channels = c1
    
       # Source: Kafka
       default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
       default-agent.sources.source1.channels = c1
       default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
       default-agent.sources.source1.kafka.topics = flume-test
       default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
    
       # Sink: OSS-HDFS
       default-agent.sinks.k1.type = hdfs
       default-agent.sinks.k1.hdfs.path = oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}
       default-agent.sinks.k1.hdfs.fileType = DataStream
    
       # Channel: memory buffer
       default-agent.channels.c1.type = memory
       default-agent.channels.c1.capacity = 100
       default-agent.channels.c1.transactionCapacity = 100
    
       # Bind Source and Sink to the Channel
       default-agent.sources.source1.channels = c1
       default-agent.sinks.k1.channel = c1
  3. Replace the placeholders with your actual values:

    PlaceholderDescriptionExample
    <kafka-host1:port1,kafka-host2:port2...>Hostnames and port numbers of the Kafka brokersbroker-1:9092,broker-2:9092
    {yourBucketName}Name of the bucket with OSS-HDFS enabledmy-bucket
    {yourBucketRegion}Region ID where the bucket is locatedcn-hangzhou
    {path}Target directory in OSS-HDFSresult
  4. Review the key configuration parameters:

    ParameterDescription
    default-agent.sources.source1.kafka.bootstrap.serversKafka broker addresses (host:port pairs, comma-separated)
    default-agent.sinks.k1.hdfs.pathOSS-HDFS output path. Format: oss://{yourBucketName}.{yourBucketRegion}.oss-dls.aliyuncs.com/{path}. Example: oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result
    default-agent.channels.c1.capacityMaximum number of events stored in the channel. Adjust based on your throughput.
    default-agent.channels.c1.transactionCapacityMaximum number of events each transaction reads from the Source or writes to the Sink. Adjust based on your throughput.
  5. Click Save.

Verify data synchronization

  1. Use Secure Shell (SSH) to log on to the DataFlow cluster. See Log on to a cluster.

  2. Create a Kafka topic named flume-test:

       kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
  3. Start a console producer and send a test message:

       kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

    Type abc and press Enter.

  4. Confirm that Flume has written the data to OSS-HDFS. A file named FlumeData.xxxx (where xxxx is the creation timestamp in milliseconds) should appear at the output path:

       oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result/FlumeData.xxxx