Apache Flume is supported in E-MapReduce (EMR) V3.16.0 and later. This topic describes how to use Flume to synchronize data from an EMR Kafka cluster to Hive of an EMR Hadoop cluster by using the command-line interface (CLI).

Prerequisites

  • A Hadoop cluster is created, and Flume is selected from the optional services during cluster creation. For more information about how to create a cluster, see Create a cluster.
    Note The Flume software package is stored in the /usr/lib/flume-current path. For more information about the paths of common EMR files, see Common file paths.
  • A Kafka cluster is created. For more information about how to create a cluster, see Create a cluster.
    Note

Synchronize data from a Kafka cluster to Hive

  1. Connect to the master node of your Hadoop cluster in SSH mode.
    For more information about how to connect to the master node, see Connect to the master node of an EMR cluster in SSH mode.
  2. Create a Hive table.
    To write data to Hive by performing transactional operations, you must specify the transactional property of Flume when you create the Hive table. In this example, the flume_test table is created.
    create table flume_test (id int, content string)
    clustered by (id) into 2 buckets stored as orc  TBLPROPERTIES('transactional'='true');
  3. Configure Flume.
    1. Go to the /etc/ecm/flume-conf directory.
      cd /etc/ecm/flume-conf
    2. Create a configuration file named flume.properties.
      vim flume.properties 
    3. Add the following configurations to the configuration file:
      a1.sources = source1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.source1.channels = c1
      a1.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      a1.sources.source1.kafka.topics = flume-test
      a1.sources.source1.kafka.consumer.group.id = flume-test-group
      
      # Describe the sink
      a1.sinks.k1.type = hive
      a1.sinks.k1.hive.metastore = thrift://xxxx:9083
      a1.sinks.k1.hive.database = default
      a1.sinks.k1.hive.table = flume_test
      a1.sinks.k1.serializer = DELIMITED
      a1.sinks.k1.serializer.delimiter = ","
      a1.sinks.k1.serializer.serdeSeparator = ','
      a1.sinks.k1.serializer.fieldnames =id,content
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = <100>
      a1.channels.c1.transactionCapacity = <100>
      
      a1.sources.source1.channels = c1
      a1.sinks.k1.channel = c1
      • a1.sources.source1.kafka.bootstrap.servers: the hostnames and port numbers of Kafka brokers.
      • a1.channels.c1.capacity: the maximum number of events stored in a channel. Modify the parameter based on your business requirements.
      • a1.channels.c1.transactionCapacity: the maximum number of events that a channel can receive from a source or provide to a sink. Modify the parameter based on your business requirements.
      • a1.sinks.k1.hive.metastore: the Uniform Resource Identifier (URI) of the Hive metastore. Specify the URI in the format of thrift://emr-header-1.cluster-xxx:9083. xxx in emr-header-1.cluster-xxx indicates the hostname of the master node. You can run the hostname command on the node to obtain the hostname.
  4. Run the following command to start Flume:
    flume-ng agent --name a1 --conf /etc/ecm/flume-conf --conf-file flume.properties
  5. Test data synchronization.
    1. Connect to the master node of the Kafka cluster in SSH mode. For more information about how to connect to the master node, see Connect to the master node of an EMR cluster in SSH mode.
    2. Run the following command to create a topic named flume-test:
      /usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic flume-test --create
    3. Run the following command to generate test data:
      kafka-console-producer.sh --topic flume-test --broker-list emr-header-1:9092

      For example, enter 1,a and press Enter.

    4. Connect to the master node of the Hadoop cluster in SSH mode and run the following command to configure Hive parameters on the client. Then, query data in the Hive transaction table on the client.
      set hive.support.concurrency=true;
      set hive.exec.dynamic.partition.mode=nonstrict;
      set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
      Run the following command to query data in the flume_test table:
      select * from flume_test;
      The following information is returned:
      OK
      1    a

Consume data from a high-security Kafka cluster

If you consume data from a high-security Kafka cluster, you must also perform the following configurations:
  • Configure Kerberos authentication for your Kafka cluster and copy the generated file test.keytab to the /etc/ecm/flume-conf directory of the Hadoop cluster. For more information about how to configure Kerberos authentication, see Configure MIT Kerberos authentication. Copy the krb5.conf file in the /etc/ecm/has-conf/ directory of the Kafka cluster to the /etc/ecm/flume-conf directory of your Hadoop cluster.
  • Configure flume.properties.
    Add the following configurations to the flume.properties file:
    a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
    a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
    a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
  • Configure a Kafka client.
    • Create a file named flume_jaas.conf in the /etc/ecm/flume-conf directory. Configure the following information in the file:
      KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/ecm/flume-conf/test.keytab"
        serviceName="kafka"
        principal="test@EMR.${realm}.COM";
      };

      Replace ${realm} with the Kerberos realm of the Kafka cluster.

      You can run the hostname command in the Kafka cluster to obtain the Kerberos realm. A hostname in the emr-header-1.cluster-xxx format is returned. For example, if emr-header-1.cluster-123456 is returned, the Kerberos realm is 123456.

    • Modify the flume-env.sh file in the /etc/ecm/flume-conf/ directory.
      By default, no file named flume-env.sh exists in the /etc/ecm/flume-conf/ directory. You must copy the flume-env.sh.template file and rename it flume-env.sh. Add the following configurations to the end of the flume-env.sh file:
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/etc/ecm/flume-conf/krb5.conf"
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"
  • Configure domain names.
    Add the long domain names and IP addresses of the nodes in the Kafka cluster to the end of the hosts file that is stored in the /etc/ directory of the Hadoop cluster. A long domain name is in the format of emr-header-1.cluster-xxxx. Domains
    Note In the preceding figure, the IP addresses and domain names of the Hadoop cluster are marked with 1 and those of the Kafka cluster are marked with 2.

Use Flume after cross-region access is configured

After you configure cross-region access, perform the following configurations to use Flume:
  • Configure Kerberos authentication for your Kafka cluster and copy the generated file test.keytab to the /etc/ecm/flume-conf directory of the Hadoop cluster. For more information about how to configure Kerberos authentication, see Configure MIT Kerberos authentication.
  • Configure the flume.properties file.
    Add the following configurations to the flume.properties file:
    a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
    a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
    a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
  • Configure a Kafka client.
    • Create a file named flume_jaas.conf in the /etc/ecm/flume-conf directory. Configure the following information in the file:
      KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/ecm/flume-conf/test.keytab"
        serviceName="kafka"
        principal="test@EMR.${realm}.COM";
      };

      Replace ${realm} with the Kerberos realm of the Kafka cluster.

      You can run the hostname command in the Kafka cluster to obtain the Kerberos realm. A hostname in the emr-header-1.cluster-xxx format is returned. For example, if emr-header-1.cluster-123456 is returned, the Kerberos realm is 123456.

    • Modify the flume-env.sh file in the /etc/ecm/flume-conf/ directory.
      By default, no file named flume-env.sh exists in the /etc/ecm/flume-conf/ directory. You must copy the flume-env.sh.template file and rename it flume-env.sh. Add the following configurations to the end of the flume-env.sh file:
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"