This topic describes how to run a Spark Streaming job in an E-MapReduce (EMR) Hadoop cluster to process data in a Kafka cluster.

Background information

EMR Hadoop and Kafka clusters run based on open source software. Therefore, you can use the relevant official documentation for reference during data development.

Methods to access Kafka clusters for which Kerberos authentication is enabled

EMR allows you to create Kafka clusters for which Kerberos authentication is enabled. You can run a job in a Hadoop cluster to access a Kafka cluster for which Kerberos authentication is enabled. The access method depends on whether Kerberos authentication is enabled for the Hadoop cluster:
  • Hadoop cluster for which Kerberos authentication is disabled: Provide the kafka_client_jaas.conf and krb5.conf files that are used for the Kerberos authentication of the Kafka cluster.
  • Hadoop cluster for which Kerberos authentication is enabled: Provide the kafka_client_jaas.conf and krb5.conf files that are used for the Kerberos authentication of the Hadoop cluster. The Kafka cluster can be authenticated based on the cross-domain trust feature of Kerberos authentication.

    For more information about the cross-domain trust feature, see Configure cross-realm trust.

Both methods require you to provide the kafka_client_jaas.conf and krb5.conf files to support Kerberos authentication when you run a job.

  • Content of the kafka_client_jaas.conf file:
    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        serviceName="kafka"
        keyTab="/path/to/kafka.keytab"
        principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
    };
    Note For information about how to obtain the keytab file, see Configure MIT Kerberos authentication.
  • You can obtain the krb5.conf file from the /etc/ directory of the Kafka cluster.

Use Spark Streaming to access a Kafka cluster for which Kerberos authentication is enabled

Add the long domain name and IP address of each node of the Kafka cluster to the /etc/hosts file for each node of the Hadoop cluster. The long domain name and IP address of a node can be obtained in the /etc/hosts file for the node. A long domain name is in the format of emr-xxx-x.cluster-xxx.

When you run a Spark Streaming job to access a Kafka cluster for which Kerberos authentication is enabled, you can specify the kafka_client_jaas.conf and kafka.keytab files in the spark-submit command line.
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class  xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3
In the kafka_client_jaas.conf file, the path of the keytab file must be a relative path. Make sure the keyTab parameters are configured in the following format:
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};

Use Spark SQL statements to access Kafka

Sample SQL statement:
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
Note /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* contains the type of the data source that you want to use to access the Kafka data source. If your EMR cluster uses Spark 2, you must change spark3 in the preceding statement to spark2.
The following example shows how to create a table and query data from the table.
create table test_kafka
using loghub
  options(kafka.bootstrap.servers='alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
          subscribe='test_topic',
          startingoffsets='earliest'
)

select * from test_kafka;

Appendix