Apache Flume is supported in E-MapReduce (EMR) V3.16.0 and later. This topic describes how to use Flume to synchronize data from an EMR Kafka cluster to Alibaba Cloud Object Storage Service (OSS).

Prerequisites

Procedure

  1. Create an OSS path. For more information, see Create buckets.
    In this topic, the OSS path is oss://flume-test/result.
  2. Configure Flume.
    1. Connect to the master node of the Kafka cluster in SSH mode.
    2. Modify the OSS cache size or the -Xmx option. This option specifies the maximum memory for Java Virtual Machine (JVM).
      When you use Flume to write data to OSS, a large JVM memory space is occupied. You can reduce the OSS cache size or increase the value of the -Xmx option for Flume agents.
      • Modify the OSS cache size.

        Copy the hdfs-site.xml configuration file from /etc/ecm/hadoop-conf to /etc/ecm/flume-conf, and set the smartdata.cache.buffer.size parameter to a smaller value, such as 1048576.

      • Change the value of the -Xmx option.
        In the /etc/ecm/flume-conf directory of Flume, copy the flume-env.sh.template file and rename it flume-env.sh. Change the value of the -Xmx option in the flume-env.sh file to 1g.
        export JAVA_OPTS="-Xmx1g"
    3. Create a configuration file named flume.properties.
      a1.sources = source1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.source1.channels = c1
      a1.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      a1.sources.source1.kafka.topics = flume-test
      a1.sources.source1.kafka.consumer.group.id = flume-test-group
      
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path = oss://flume-test/result
      a1.sinks.k1.hdfs.fileType=DataStream
      
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = <100>
      a1.channels.c1.transactionCapacity = <100>
      
      # Bind the source and sink to the channel
      a1.sources.source1.channels = c1
      a1.sinks.k1.channel = c1
      • a1.sources.source1.kafka.bootstrap.servers: the hostnames and port numbers of Kafka brokers.
      • a1.sinks.k1.hdfs.path: the OSS path.
      • a1.channels.c1.capacity: the maximum number of events stored in a channel. Modify the parameter based on your business requirements.
      • a1.channels.c1.transactionCapacity: the maximum number of events that a channel can receive from a source or provide to a sink. Modify the parameter based on your business requirements.
  3. Start Flume.
    • If you have modified the OSS cache size when you configure Flume, use the --classpath parameter to pass in OSS-related dependencies and configurations:
      flume-ng agent --name a1 --conf /etc/ecm/flume-conf  --conf-file flume.properties  --classpath "/opt/apps/extra-jars/*:/etc/ecm/flume-conf/hdfs-site.xml"
    • If you have modified the -Xmx option for Flume agents when you configure Flume, pass in only OSS-related dependencies:
      flume-ng agent --name a1 --conf /etc/ecm/flume-conf  --conf-file flume.properties  --classpath "/opt/apps/extra-jars/*"
  4. Test data synchronization.
    1. Connect to the master node of the Kafka cluster in SSH mode. For more information, see Connect to the master node of an EMR cluster in SSH mode.
    2. Run the following command to create a topic named flume-test:
      /usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic flume-test --create
    3. Run the following command to generate test data:
      kafka-console-producer.sh --topic flume-test --broker-list emr-header-1:9092

      For example, enter abc and press Enter.

    4. Check the newly generated file. A file named FlumeData.xxxx is generated in the oss://flume-test/result directory. xxxx indicates the timestamp of the current system time in milliseconds.