E-MapReduce(简称EMR)从EMR-3.16.0版本开始支持Apache Flume。本文介绍如何使用Flume同步EMR Kafka集群的数据至阿里云OSS。

前提条件

  • 已开通OSS服务,详情请参见开通OSS服务
  • 已创建Kafka集群,详情请参见创建集群
    说明
    • 如果创建的是Hadoop高安全集群,消费标准Kafka集群的数据,则需要在Hadoop集群配置Kerberos认证,详情请参见兼容MIT Kerberos认证
    • 如果创建的是Kafka高安全集群,通过Flume将Kafka数据写入Hadoop集群,详情请参见 消费Kerberos Kafka Source
    • 如果创建的Hadoop集群和Kafka集群都是高安全集群,需配置跨域互信,详情请参见跨域互信,其它配置详情请参见跨域互信使用Flume

操作流程

  1. 创建OSS路径,详情请参见创建存储空间
    本文OSS路径为oss://flume-test/result
  2. 配置Flume。
    1. 通过SSH方式连接Kafka集群。
      详情请参见登录集群
    2. 修改OSS缓存大小或设置JVM最大可用内存(Xmx)。
      Flume向OSS写入数据时,因为需要占用较大的JVM内存,所以可以减小OSS缓存或者增大Flume Agent的Xmx。
      • 修改OSS缓存大小。

        hdfs-site.xml配置文件从/etc/ecm/hadoop-conf拷贝至/etc/ecm/flume-conf,改小配置项smartdata.cache.buffer.size的值,例如修改为1048576。

      • 修改Xmx。
        在Flume的配置路径/etc/ecm/flume-conf下,复制配置文件flume-env.sh.template并重命名为flume-env.sh,设置Xmx的值,例如设置为1g。
        export JAVA_OPTS="-Xmx1g"
    3. 创建配置文件flume.properties
      a1.sources = source1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.source1.channels = c1
      a1.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      a1.sources.source1.kafka.topics = flume-test
      a1.sources.source1.kafka.consumer.group.id = flume-test-group
      
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path = oss://flume-test/result
      a1.sinks.k1.hdfs.fileType=DataStream
      
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = <100>
      a1.channels.c1.transactionCapacity = <100>
      
      # Bind the source and sink to the channel
      a1.sources.source1.channels = c1
      a1.sinks.k1.channel = c1
      • a1.sources.source1.kafka.bootstrap.servers:Kafka集群Broker的Host和端口号。
      • a1.sinks.k1.hdfs.path:OSS路径。
      • a1.channels.c1.capacity:通道中存储的最大事件数。请根据实际环境修改该参数值。
      • a1.channels.c1.transactionCapacity:每个事务通道将从源接收或提供给接收器的最大事件数。请根据实际环境修改该参数值。
  3. 启动Flume。
    • 如果配置Flume时修改了OSS缓存大小,需要使用--classpath参数传入OSS相关依赖和配置。
      flume-ng agent --name a1 --conf /etc/ecm/flume-conf --conf-file flume.properties --classpath "/opt/apps/extra-jars/*:/etc/ecm/flume-conf/hdfs-site.xml"
    • 如果修改了Flume Agent的Xmx,只需要传入OSS相关依赖。
      flume-ng agent --name a1 --conf /etc/ecm/flume-conf --conf-file flume.properties --classpath "/opt/apps/extra-jars/*"
  4. 测试数据同步情况。
    1. 通过SSH方式连接Kafka集群,详情请参见登录集群
    2. 创建名称为flume-test的Topic。
      /usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic flume-test --create
    3. 生成测试数据。
      kafka-console-producer.sh --topic flume-test --broker-list emr-header-1:9092

      例如输入abc并回车。

    4. 在OSS的oss://flume-test/result路径下会以当前时间的时间戳(毫秒)为后缀生成文件FlumeData.xxxx