Run a Spark Streaming or Spark SQL job in an E-MapReduce (EMR) Hadoop cluster to read data from a Kafka cluster, including Kafka clusters with Kerberos authentication enabled. EMR Hadoop and Kafka clusters are based on open source software, so you can refer to the official Spark documentation during development.
Prerequisites
Before you begin, ensure that you have:
An EMR Hadoop cluster and a Kafka cluster
The
kafka_client_jaas.conf,krb5.conf, andkafka.keytabfiles (required only if the Kafka cluster has Kerberos authentication enabled)
Access methods for Kerberos-enabled Kafka clusters
EMR supports creating Kafka clusters with Kerberos authentication enabled. The configuration files you need to provide depend on whether Kerberos authentication is also enabled on the Hadoop cluster.
| Hadoop cluster | Files to provide | How authentication works |
|---|---|---|
| Kerberos disabled | kafka_client_jaas.conf and krb5.conf from the Kafka cluster | Direct Kerberos authentication against the Kafka cluster |
| Kerberos enabled | kafka_client_jaas.conf and krb5.conf from the Hadoop cluster | Kafka cluster is authenticated via cross-realm trust |
For cross-realm trust setup, see Configure cross-realm trust.
Prepare the Kerberos configuration files
kafka_client_jaas.conf — use the following template:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/path/to/kafka.keytab"
principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};To get the keytab file, see Configure MIT Kerberos authentication.
krb5.conf — copy from the /etc/ directory of the Kafka cluster.
Use Spark Streaming to access a Kerberos-enabled Kafka cluster
Step 1: Update the hosts file
Add the long domain name and IP address of each Kafka cluster node to /etc/hosts on every Hadoop cluster node. Long domain names follow the format emr-xxx-x.cluster-xxx. Find them in the /etc/hosts file of the corresponding Kafka node.
Step 2: Submit the Spark Streaming job
Pass the kafka_client_jaas.conf, kafka.keytab, and krb5.conf files to the spark-submit command using --files, and reference them in the JVM options:
spark-submit \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" \
--files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf \
--class xx.xx.xx.KafkaSample \
--num-executors 2 \
--executor-cores 2 \
--executor-memory 1g \
--master yarn-cluster \
xxx.jar arg1 arg2 arg3When you distribute the keytab file via --files, Spark copies it to the executor working directory. If keyTab in kafka_client_jaas.conf is an absolute path (as shown in the template above), the executor cannot locate the file and the job fails. Use a relative path instead:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="kafka.keytab"
principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};Use Spark SQL to access Kafka
Start spark-sql with the EMR SDK jars that include the loghub data source type for Kafka:
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*If your EMR cluster uses Spark 2, replace spark3 with spark2 in the path.
Create a table mapped to a Kafka topic and query it:
CREATE TABLE test_kafka
USING loghub
OPTIONS (
kafka.bootstrap.servers = 'alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
subscribe = 'test_topic',
startingoffsets = 'earliest'
);
SELECT * FROM test_kafka;Reference
EMR demo code: GitHub
Sample Spark Streaming + Kafka code: GitHub
Spark Streaming + Kafka integration: spark.apache.org
Structured Streaming + Kafka integration: spark.apache.org