Apache Flume is supported in E-MapReduce (EMR) V3.16.0 and later. This topic describes how to use Flume to synchronize data from an EMR Kafka cluster to HBase of an EMR Hadoop cluster by using the command-line interface (CLI).

Prerequisites

  • A Hadoop cluster is created, and Flume and HBase are selected from the optional services during cluster creation. For more information about how to create a cluster, see Create a cluster.
    Note The Flume software package is stored in the /usr/lib/flume-current path. For more information about the paths of common EMR files, see Common file paths.
  • A Kafka cluster is created. For more information about how to create a cluster, see Create a cluster.

Synchronize data from a Kafka cluster to HBase

  1. Connect to the master node of your Hadoop cluster in SSH mode.
    For more information about how to connect to the master node, see Connect to the master node of an EMR cluster in SSH mode.
  2. Create an HBase table named flume_test and a column family named column.
    create 'flume_test','column'
  3. Configure Flume.
    1. Go to the /etc/ecm/flume-conf directory.
      cd /etc/ecm/flume-conf
    2. Create a configuration file named flume.properties.
      vim flume.properties 
    3. Add the following configurations to the configuration file:
      a1.sources = source1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.source1.channels = c1
      a1.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      a1.sources.source1.kafka.topics = flume-test
      a1.sources.source1.kafka.consumer.group.id = flume-test-group
      
      a1.sinks.k1.type = hbase
      a1.sinks.k1.table = flume_test
      a1.sinks.k1.columnFamily = column
      
      
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = <100>
      a1.channels.c1.transactionCapacity = <100>
      
      # Bind the source and sink to the channel
      a1.sources.source1.channels = c1
      a1.sinks.k1.channel = c1
      • a1.sources.source1.kafka.bootstrap.servers: the hostnames and port numbers of Kafka brokers.
      • a1.sinks.k1.table: the name of the HBase table.
      • a1.sinks.k1.columnFamily: the column family name.
      • a1.channels.c1.capacity: the maximum number of events stored in a channel. Modify the parameter based on your business requirements.
      • a1.channels.c1.transactionCapacity: the maximum number of events that a channel can receive from a source or provide to a sink. Modify the parameter based on your business requirements.
  4. Run the following command to start Flume:
    flume-ng agent --name a1 --conf /etc/ecm/flume-conf  --conf-file flume.properties
  5. Test data writes.
    After kafka-console-producer.sh is used to generate data for the Kafka cluster, you can query the data in HBase. HBase

Consume data from a high-security Kafka cluster

If you consume data from a high-security Kafka cluster, you must also perform the following configurations:
  • Configure Kerberos authentication for your Kafka cluster and copy the generated file test.keytab to the /etc/ecm/flume-conf directory of the Hadoop cluster. For more information about how to configure Kerberos authentication, see Configure MIT Kerberos authentication. Copy the krb5.conf file in the /etc/ecm/has-conf/ directory of the Kafka cluster to the /etc/ecm/flume-conf directory of your Hadoop cluster.
  • Configure flume.properties.
    Add the following configurations to the flume.properties file:
    a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
    a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
    a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
  • Configure a Kafka client.
    • Create a file named flume_jaas.conf in the /etc/ecm/flume-conf directory. Configure the following information in the file:
      KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/ecm/flume-conf/test.keytab"
        serviceName="kafka"
        principal="test@EMR.${realm}.COM";
      };

      Replace ${realm} with the Kerberos realm of the Kafka cluster.

      You can run the hostname command in the Kafka cluster to obtain the Kerberos realm. A hostname in the emr-header-1.cluster-xxx format is returned. For example, if emr-header-1.cluster-123456 is returned, the Kerberos realm is 123456.

    • Modify the flume-env.sh file in the /etc/ecm/flume-conf/ directory.
      By default, no file named flume-env.sh exists in the /etc/ecm/flume-conf/ directory. You must copy the flume-env.sh.template file and rename it flume-env.sh. Add the following configurations to the end of the flume-env.sh file:
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/etc/ecm/flume-conf/krb5.conf"
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"
  • Configure domain names.
    Add the long domain names and IP addresses of the nodes in the Kafka cluster to the end of the hosts file that is stored in the /etc/ directory of the Hadoop cluster. A long domain name is in the format of emr-header-1.cluster-xxxx. Domains
    Note In the preceding figure, the IP addresses and domain names of the Hadoop cluster are marked with 1 and those of the Kafka cluster are marked with 2.

Use Flume after cross-region access is configured

After you configure cross-region access, perform the following configurations to use Flume:
  • Configure Kerberos authentication for your Kafka cluster and copy the generated file test.keytab to the /etc/ecm/flume-conf directory of the Hadoop cluster. For more information about how to configure Kerberos authentication, see Configure MIT Kerberos authentication.
  • Configure the flume.properties file.
    Add the following configurations to the flume.properties file:
    a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
    a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
    a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
  • Configure a Kafka client.
    • Create a file named flume_jaas.conf in the /etc/ecm/flume-conf directory. Configure the following information in the file:
      KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/ecm/flume-conf/test.keytab"
        serviceName="kafka"
        principal="test@EMR.${realm}.COM";
      };

      Replace ${realm} with the Kerberos realm of the Kafka cluster.

      You can run the hostname command in the Kafka cluster to obtain the Kerberos realm. A hostname in the emr-header-1.cluster-xxx format is returned. For example, if emr-header-1.cluster-123456 is returned, the Kerberos realm is 123456.

    • Modify the flume-env.sh file in the /etc/ecm/flume-conf/ directory.
      By default, no file named flume-env.sh exists in the /etc/ecm/flume-conf/ directory. You must copy the flume-env.sh.template file and rename it flume-env.sh. Add the following configurations to the end of the flume-env.sh file:
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"