本文以E-MapReduce-Flume实时同步HDFS audit日志至HDFS为例,介绍Flume的使用。

背景信息

E-MapReduce从3.19.0版本开始对EMR-Flume提供集群管理的功能。通过集群管理功能,可以在Web页面方便的配置和管理Flume Agent。示例中,在master实例启动Flume agent,收集本地磁盘中的audit日志通过Avro协议发送数据至core实例,在core实例配置并启动failover sink processor,接收master实例发送的数据并sink到HDFS中。Flume Agent拓扑结构如下图所示。

flume
注意 您可以根据实际情况设置Flume Agent的拓扑结构。

Flume其他使用场景的配置,请参见Flume 配置说明

准备工作

创建E-MapReduce Hadoop集群,在可选服务中选择Flume。详情请参见创建集群

操作步骤

  1. Core实例配置并启动Flume Agent。

    例如在emr-worker-1节点进行操作。

    1. 登录阿里云 E-MapReduce 控制台
    2. 单击上方的集群管理页签。
    3. 集群管理页面,单击集群右侧的详情
    4. 在左侧导航栏中,选择集群服务 > FLUME
    5. 单击配置页签,在配置页面设置如下 。
      default-agent.sinks.default-sink.type hdfs
      default-agent.channels.default-channel.type file
      default-agent.sources.default-source.type avro
      deploy_node_hostname emr-worker-1
    6. 在配置页面通过自定义配置添加如下配置:
      default-agent.sinks.default-sink.hdfs.path 对于高可用集群,使用 hdfs://emr-cluster/path 形式的地址
      default-agent.sinks.default-sink.hdfs.fileType DataStream
      default-agent.sinks.default-sink.hdfs.rollSize 0
      default-agent.sinks.default-sink.hdfs.rollCount 0
      default-agent.sinks.default-sink.hdfs.rollInterval 86400
      default-agent.sinks.default-sink.hdfs.batchSize 51200
      default-agent.sources.default-source.bind 0.0.0.0
      default-agent.sources.default-source.port 根据实际设置
      default-agent.channels.default-channel.transactionCapacity 51200
      default-agent.channels.default-channel.dataDirs channel 存储 event 数据的路径
      default-agent.channels.default-channel.checkpointDir 存储 checkpoint 的路径
      default-agent.channels.default-channel.capacity 根据 hdfs roll 进行设置
    7. 保存配置后启动Flume agent。
    8. 单击查看操作历史 ,显示操作成功后,部署拓扑页面可以看到emr-worker-1节点的flume已经是started状态。

      emr-worker-1节点启动成功后,开始启动第二个worker节点。 同样的方式,例如在worker-2节点启动flume,修改配置项。

      deploy_node_hostname 节点的hostname
      default-agent.sinks.default-sink.hdfs.path 对于高可用集群,使用 hdfs://emr-cluster/path 形式的地址
    9. 保存配置后,启动All Components,指定机器为emr-worker-2。
  2. Master实例配置并启动Flume Agent。

    例如在emr-header-1节点进行操作。

    配置agent如下:

    additional_sinks k1
    deploy_node_hostname emr-header-1
    default-agent.sources.default-source.type taildir
    default-agent.sinks.default-sink.type avro
    default-agent.channels.default-channel.type file
    新增配置如下:
    配置项
    default-agent.sources.default-source.filegroups f1
    default-agent.sources.default-source.filegroups.f1 /mnt/disk1/log/hadoop-hdfs/hdfs-audit.log.*
    default-agent.sources.default-source.positionFile 存储 position file 的路径
    default-agent.channels.default-channel.checkpointDir 存储 checkpoint 的路径
    default-agent.channels.default-channel.dataDirs 存储 event 数据的路径
    default-agent.channels.default-channel.capacity 根据 hdfs roll 进行设置
    default-agent.sources.default-source.batchSize 2000
    default-agent.channels.default-channel.transactionCapacity 2000
    default-agent.sources.default-source.ignoreRenameWhenMultiMatching true
    default-agent.sinkgroups g1
    default-agent.sinkgroups.g1.sinks default-sink k1
    default-agent.sinkgroups.g1.processor.type failover
    default-agent.sinkgroups.g1.processor.priority.default-sink 10
    default-agent.sinkgroups.g1.processor.priority.k1 5
    default-agent.sinks.default-sink.hostname emr-worker-1 节点的IP
    default-agent.sinks.default-sink.port emr-worker-1 节点 Flume Agent 的 port
    default-agent.sinks.k1.hostname emr-worker-2 节点的 IP
    default-agent.sinks.k1.port emr-worker-2 节点 Flume Agent 的 port
    default-agent.sinks.default-sink.batch-size 2000
    default-agent.sinks.k1.batch-size 2000
    default-agent.sinks.k1.type avro
    default-agent.sinks.k1.channel default-channel

查看同步结果

使用HDFS命令,可以看到同步的数据被写入FlumeData.${timestamp}形式的文件中,其中timestamp为文件创建的时间戳。

查看日志

Flume agent日志的存放路径为/mnt/disk1/log/flume/default-agent/flume.log

查看监控信息

集群与服务管理页面提供了Flume agent的监控信息。通过在集群与服务管理页面单击Flume服务进行访问。mointor_info