This topic describes how to use Flume to synchronize data from an E-MapReduce (EMR) Kafka cluster to the HBase service of an EMR data serving cluster.

Prerequisites

  • An EMR data lake cluster is created, and Flume is selected from the optional services during cluster creation. For more information, see Create a cluster.
  • An EMR data serving cluster is created. For more information, see Create a cluster.
  • An EMR Dataflow cluster is created and the Kafka service is selected from the optional services during cluster creation. For more information, see Create a cluster.

Procedure

  1. Create an HBase table.
    1. Use Secure Shell (SSH) to log on to the data serving cluster. For more information, see Log on to a cluster.
    2. Run the following command to connect to HBase:
      hbase shell
    3. Create an HBase table named flume_test. The table contains a column family.
      create 'flume_test','column'
  2. Configure the Flume service.
    1. Go to the Configure tab of the Flume service.
      1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
      2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
      3. On the EMR on ECS page, find the cluster that you want to manage and click Services in the Actions column.
      4. On the Services tab, click Configure in the Flume service section.
    2. On the Configure tab, click the flume-conf.properties subtab.
      In this example, the global configuration is used. If you want to configure the cluster by node, you can select Independent Node Configuration from the drop-down list on the Configure subtab of the Flume service.
    3. Add the following content to the value of the flume-conf.properties parameter:
      Note The value of the default-agent parameter in the sample code must be the same as that of the agent_name parameter on the Configure tab of the Flume service.
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      default-agent.sinks.k1.type = hbase
      default-agent.sinks.k1.table = flume_test
      default-agent.sinks.k1.columnFamily = column
      
      
      # Use a channel which buffers events in memory
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 100
      default-agent.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      Parameter Description
      default-agent.sources.source1.kafka.bootstrap.servers The servers and port numbers of the brokers in the Kafka cluster.
      default-agent.sinks.k1.table The name of the HBase table.
      default-agent.sinks.k1.columnFamily The name of the column family.
      default-agent.channels.c1.capacity The maximum number of events that are stored in the channel. Modify the value of this parameter based on your business requirements.
      default-agent.channels.c1.transactionCapacity The maximum number of events that each transaction channel receives from the source or provides for the receiver. Modify the value of this parameter based on your business requirements.
  3. Start the Flume service.
    1. On the Status tab of the Flume service, find the FlumeAgent component and choose More > Restart in the Actions column.
    2. In the dialog box that appears, enter an execution reason and click OK.
    3. In the Confirm message, click OK.
  4. Test data writes.
    Log on to the Dataflow cluster, and run the kafka-console-producer.sh command to generate test data. After HBase is connected to the data serving cluster, you can view the test data. HBase