本文介紹如何在E-MapReduce的Hadoop叢集運行Spark Streaming作業,處理Kafka叢集的資料。
背景資訊
E-MapReduce上的Hadoop叢集和Kafka叢集都是基於純開源軟體,相關編程使用方法可參見官方相應文檔。
- Spark官方文檔:streaming-kafka-integration和structured-streaming-kafka-integration。
- E-MapReduce-demo:github地址。
訪問Kerberos Kafka叢集
E-MapReduce支援建立基於Kerberos認證的Kafka叢集。當Hadoop叢集作業需要訪問Kerberos Kafka叢集時,有以下兩種使用方式:
- 非Kerberos Hadoop叢集:提供用於Kafka叢集的Kerberos認證的kafka_client_jaas.conf和krb5.conf檔案。
- Kerberos Hadoop叢集:基於Kerberos叢集跨域互信,提供用於Hadoop叢集的Kerberos認證的kafka_client_jaas.conf和krb5.conf檔案。
跨域互信詳細資料,請參見跨域互信。
以上兩種方式都需要運行作業時提供kafka_client_jaas.conf和krb5.conf檔案,用於Kerberos認證。
- kafka_client_jaas.conf檔案格式如下。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/path/to/kafka.keytab" principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM"; };說明 keytab檔案的擷取,請參見相容MIT Kerberos認證。 - krb5.conf檔案,請從Kafka叢集的/etc/目錄下擷取。
Spark Streaming訪問Kerberos Kafka叢集
添加Kafka叢集各個節點的長網域名稱和IP資訊至Hadoop叢集各個節點的/etc/hosts中。長網域名稱和IP資訊,您可以在/etc/hosts中擷取,長網域名稱形式為emr-xxx-x.cluster-xxx。
當運行Spark Streaming作業訪問Kerberos kafka時,可以在spark-submit命令列參數中提供所需的kafka_client_jaas.conf和kafka.keytab檔案。
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3kafka_client_jaas.conf檔案中,keytab檔案路徑需要寫相對路徑,請嚴格按照如下keyTab配置項寫法。
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="kafka.keytab"
principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};spark-sql訪問Kafka
範例程式碼如下。
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*說明
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*中包含Kafka DataSource類型。如果您EMR叢集使用的是Spark2,則應修改上面命令中的spark3應該換成spark2。建立和查詢樣本如下。
create table test_kafka
using loghub
options(kafka.bootstrap.servers='alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
subscribe='test_topic',
startingoffsets='earliest'
)
select * from test_kafka;附錄
- 範例程式碼,請參見Spark對接Kafka。
- 更多參數的配置和含義,請參見Structured Streaming + Kafka Integration Guide。