This topic describes how to use the Flume service to synchronize data from an E-MapReduce (EMR) Kafka cluster to Alibaba Cloud Object Storage Service (OSS).

Prerequisites

  • OSS is activated and an OSS bucket is created. For more information, see Activate OSS and Create buckets.
  • An EMR data lake cluster is created, and Flume is selected from the optional services during cluster creation. For more information, see Create a cluster.
  • An EMR Dataflow cluster is created and the Kafka service is selected from the optional services during cluster creation. For more information, see Create a cluster.

Procedure

  1. Configure the Flume service.
    1. Go to the Configure tab of the Flume service.
      1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
      2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
      3. On the EMR on ECS page, find the cluster that you want to manage and click Services in the Actions column.
      4. On the Services tab, click Configure in the Flume service section.
    2. Specify the maximum available memory (Xmx) of a Java virtual machine (JVM)
      When data is written from Flume to OSS, a large amount of JVM memory is occupied. You can increase the value of the -Xmx option for Flume agents.
      1. Click the flume-env.sh subtab.

        In this example, the global configuration is used. If you want to configure the cluster by node, you can select Independent Node Configuration from the drop-down list on the Configure subtab of the Flume service.

      2. Modify the value of the JAVA_OPTS parameter.

        For example, if you want to set the -Xmx option to 1 GB, you need to change the value of this parameter to -Xmx1g.

    3. On the Configure tab, click the flume-conf.properties subtab.
      In this example, the global configuration is used. If you want to configure the cluster by node, you can select Independent Node Configuration from the drop-down list on the Configure subtab of the Flume service.
    4. Add the following content to the value of the flume-conf.properties parameter:
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = oss://flume-test/result
      default-agent.sinks.k1.hdfs.fileType=DataStream
      
      # Use a channel which buffers events in memory
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = <100>
      default-agent.channels.c1.transactionCapacity = <100>
      
      # Bind the source and sink to the channel
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      Parameter Description
      default-agent.sources.source1.kafka.bootstrap.servers The servers and port numbers of brokers in the Kafka cluster.
      default-agent.sinks.k1.hdfs.path The path of the OSS bucket. In this example, oss://flume-test/result is used.
      default-agent.channels.c1.capacity The maximum number of events that are stored in the channel. Modify the value of this parameter based on your business requirements.
      default-agent.channels.c1.transactionCapacity The maximum number of events that each transaction channel receives from the source or provides for the receiver. Modify the value of this parameter based on your business requirements.
    5. Save the configurations.
      1. Click Save in the lower-left corner.
      2. In the dialog box that appears, enter an execution reason and click Save.
  2. Start the Flume service.
    1. On the Status tab of the Flume service, find the FlumeAgent component and choose More > Restart in the Actions column.
    2. In the dialog box that appears, enter an execution reason and click OK.
    3. In the Confirm message, click OK.
  3. Test data synchronization.
    1. Use Secure Shell (SSH) to log on to the Dataflow cluster. For more information, see Log on to a cluster.
    2. Run the following command to create a topic named flume-test:
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. Generate test data.
      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      For example, enter abc and press the Enter key.

    4. Check the newly generated file. A file named FlumeData.xxxx is generated in the oss://flume-test/result directory. xxxx indicates the timestamp of the current system time in milliseconds.