E-MapReduce(简称EMR)从EMR-3.16.0版本开始支持Apache Flume。本文介绍如何通过命令方式,使用Flume同步EMR Kafka集群的数据至EMR Hadoop集群的Hive。

前提条件

  • 已创建Hadoop集群,并且选择了Flume服务,详情请参见创建集群
    说明 Flume软件安装目录在/usr/lib/flume-current下,其他常用文件路径获取方式请参见常用文件路径
  • 已创建Kafka集群,详情请参见创建集群
    说明
    • 如果创建的是Hadoop高安全集群,消费标准Kafka集群的数据,需在Hadoop集群配置Kerberos认证,详情请参见兼容MIT Kerberos认证
    • 如果创建的是Kafka高安全集群,通过Flume将数据写入标准Hadoop集群,请参见 Kerberos Kafka Source
    • 如果创建的Hadoop集群和Kafka集群都是高安全集群,需配置跨域互信,详情请参见跨域互信,其它配置请参见跨域互信使用Flume

同步Kafka数据至Hive

  1. 通过SSH方式连接Hadoop集群。
    详情请参见登录集群
  2. 创建Hive表。
    Flume使用事务操作将数据写入Hive,需要在创建Hive表(flume_test)时设置transactional属性。
    create table flume_test (id int, content string)
    clustered by (id) into 2 buckets stored as orc  TBLPROPERTIES('transactional'='true');
  3. 配置Flume。
    1. 进入目录/etc/ecm/flume-conf
      cd /etc/ecm/flume-conf
    2. 创建配置文件flume.properties
      vim flume.properties 
    3. 添加如下内容。
      a1.sources = source1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.source1.channels = c1
      a1.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      a1.sources.source1.kafka.topics = flume-test
      a1.sources.source1.kafka.consumer.group.id = flume-test-group
      
      # Describe the sink
      a1.sinks.k1.type = hive
      a1.sinks.k1.hive.metastore = thrift://xxxx:9083
      a1.sinks.k1.hive.database = default
      a1.sinks.k1.hive.table = flume_test
      a1.sinks.k1.serializer = DELIMITED
      a1.sinks.k1.serializer.delimiter = ","
      a1.sinks.k1.serializer.serdeSeparator = ','
      a1.sinks.k1.serializer.fieldnames =id,content
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = <100>
      a1.channels.c1.transactionCapacity = <100>
      
      a1.sources.source1.channels = c1
      a1.sinks.k1.channel = c1
      • a1.sources.source1.kafka.bootstrap.servers:Kafka集群Broker的Host和端口号。
      • a1.channels.c1.capacity:通道中存储的最大事件数。请根据实际环境修改该参数值。
      • a1.channels.c1.transactionCapacity:每个事务通道将从源接收或提供给接收器的最大事件数。请根据实际环境修改该参数值。
      • a1.sinks.k1.hive.metastore:Hive metastore的URI,格式为thrift://emr-header-1.cluster-xxx:9083。其中emr-header-1.cluster-xxx您可以通过hostname获取。
  4. 执行如下命令,启动服务。
    flume-ng agent --name a1 --conf /etc/ecm/flume-conf --conf-file flume.properties
  5. 测试数据同步情况。
    1. 通过SSH方式连接Kafka集群,详情请参见登录集群
    2. 创建名称为flume-test的Topic。
      /usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic flume-test --create
    3. 生成测试数据。
      kafka-console-producer.sh --topic flume-test --broker-list emr-header-1:9092

      例如输入1,a并回车。

    4. 通过SSH方式连接Hadoop集群,在客户端配置Hive参数并查询表中的数据。
      set hive.support.concurrency=true;
      set hive.exec.dynamic.partition.mode=nonstrict;
      set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
      配置好后查询flume_test表中的数据。
      select * from flume_test;
      返回信息如下:
      OK
      1    a

消费Kerberos Kafka source

消费高安全Kafka集群的数据时,需要完成额外的配置:
  • 在Kafka集群配置Kerberos认证,将生成的test.keytab文件拷贝至Hadoop集群的/etc/ecm/flume-conf路径下,详情请参见兼容MIT Kerberos认证;将Kafka集群的/etc/ecm/has-conf/krb5.conf文件拷贝至Hadoop集群的/etc/ecm/flume-conf路径下。
  • 配置flume.properties
    flume.properties中添加如下配置。
    a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
    a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
    a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
  • 配置Kafka client。
    • /etc/ecm/flume-conf下创建文件flume_jaas.conf
      KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/ecm/flume-conf/test.keytab"
        serviceName="kafka"
        principal="test@EMR.${realm}.COM";
      };

      ${realm} 需要替换为Kafka集群的Kerberos realm。

      ${realm}获取方式:在Kafka集群执行命令hostname,得到形式为emr-header-1.cluster-xxx的主机名,例如emr-header-1.cluster-123456,其中数字串123456即为realm。

    • 修改/etc/ecm/flume-conf/flume-env.sh
      初始情况下,/etc/ecm/flume-conf/下没有flume-env.sh 文件,需要拷贝flume-env.sh.template并重命名为flume-env.sh。在flume-env.sh文件末尾添加如下内容。
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/etc/ecm/flume-conf/krb5.conf"
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"
  • 设置域名。
    将Kafka集群各节点的长域名和IP的绑定信息添加到Hadoop集群的/etc/hosts文件末尾。长域名的形式为emr-header-1.cluster-xxxx域名
    说明 图中标注①表示的是Hadoop集群的域名;图中标注②表示新增加的Kafka集群域名。

跨域互信使用Flume

在配置了跨域互信后,其他配置如下:
  • 在Kafka集群配置Kerberos认证,将生成的keytab文件test.keytab拷贝至Hadoop集群的/etc/ecm/flume-conf路径下,详情请参见兼容MIT Kerberos认证
  • 配置flume.properties
    flume.properties中添加如下配置。
    a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
    a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
    a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
  • 配置Kafka client。
    • /etc/ecm/flume-conf下创建文件flume_jaas.conf,内容如下。
      KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/ecm/flume-conf/test.keytab"
        serviceName="kafka"
        principal="test@EMR.${realm}.COM";
      };

      ${realm}替换为Kafka集群的Kerberos realm。

      ${realm}获取方式:在Kafka集群执行命令hostname,得到形式为emr-header-1.cluster-xxx的主机名,例如emr-header-1.cluster-123456,其中数字串123456即为realm。

    • 修改/etc/ecm/flume-conf/flume-env.sh
      初始情况下,/etc/ecm/flume-conf/下没有flume-env.sh文件,需要拷贝flume-env.sh.template并重命名为flume-env.sh。在flume-env.sh文件末尾添加如下内容。
      export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"