本文介绍如何在 E-MapReduce 的 Hadoop 集群运行 Spark Streaming 作业,处理 Kafka 集群的数据。

编程参考

由于 E-MapReduce 上的 Hadoop 集群和 Kafka 集群都是基于纯开源版本软件,所以在编程使用上参考相应官方文档即可。

访问 Kerberos Kafka 集群

E-MapReduce 支持创建基于 Kerberos 认证的 Kafka 集群。当 Hadoop 集群作业需要访问 Kerberos Kafka 集群时,有两种使用方式:

  • 非 Kerberos Hadoop 集群:提供用于 Kafka 集群的 Kerberos 认证的 kafka_client_jaas.conf krb5.conf文件。
  • Kerberos Hadoop 集群: 基于 Kerberos 集群跨域互信,提供用于 Hadoop 集群的 Kerberos 认证的 kafka_client_jaas.confkrb5.conf文件。

以上两种方式都需要运行作业时提供 kafka_client_jaas.conf 文件,用于 Kerberos 认证。

kafka_client_jaas.conf文件格式如下:
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/path/to/kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};
说明
  • keytab文件的获取,请参见Kerberos 简介
  • krb5.conf文件,请从kafka集群的/etc/krb5.conf获取。

Spark Streaming访问Kerberos Kafka集群

将Kafka集群各个节点的长域名和IP信息,加入Hadoop集群各个节点的/etc/hosts中。长域名和IP信息可在/etc/hosts中获取,长域名形式为emr-xxx-x.cluster-xxx

当我们运行Spark Streaming 作业访问 Kerberos kafka 时,可以在spark-submit命令行参数中提供所需的kafka_client_jaas.confkafka.keytab文件。
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class  xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3
需要注意的是,kafka_client_jaas.conf文件中,keytab文件路径需要写相对路径,请严格按照如下keyTab配置项写法:
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};