This topic describes how to run a Spark Streaming job in an E-MapReduce (EMR) Hadoop cluster to process data in a Kafka cluster.
Background information
EMR Hadoop and Kafka clusters run based on open source software. Therefore, you can use the relevant official documentation for reference during data development.
- Spark official documentation: streaming-kafka-integration and structured-streaming-kafka-integration
- EMR demo: GitHub
Methods to access Kafka clusters for which Kerberos authentication is enabled
- Hadoop cluster for which Kerberos authentication is disabled: Provide the kafka_client_jaas.conf and krb5.conf files that are used for the Kerberos authentication of the Kafka cluster.
- Hadoop cluster for which Kerberos authentication is enabled: Provide the kafka_client_jaas.conf and krb5.conf files that are used for the Kerberos authentication of the Hadoop cluster. The Kafka cluster can be authenticated based on the cross-domain trust feature of Kerberos authentication. For more information, see Cross-region access.
Both methods require you to provide the kafka_client_jaas.conf file to support Kerberos authentication when you run a job.
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/path/to/kafka.keytab"
principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};
- For information about how to obtain the keytab file, see Compatible with the MIT Kerberos authentication protocol.
- You can obtain the krb5.conf file from the /etc/ directory of the Kafka cluster.
Use Spark Streaming to access a Kafka cluster for which Kerberos authentication is enabled
Add the long domain name and IP address of each node of the Kafka cluster to the /etc/hosts file for each node of the Hadoop cluster. The long domain name and IP address of
a node can be obtained in the /etc/hosts file for the node. A long domain name is in the format of emr-xxx-x.cluster-xxx
.
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="kafka.keytab"
principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};
Appendix
For the sample code, visit GitHub.