All Products
Search
Document Center

E-MapReduce:Use Spark to access Kafka

Last Updated:Mar 25, 2026

Run a Spark Streaming or Spark SQL job in an E-MapReduce (EMR) Hadoop cluster to read data from a Kafka cluster, including Kafka clusters with Kerberos authentication enabled. EMR Hadoop and Kafka clusters are based on open source software, so you can refer to the official Spark documentation during development.

Prerequisites

Before you begin, ensure that you have:

  • An EMR Hadoop cluster and a Kafka cluster

  • The kafka_client_jaas.conf, krb5.conf, and kafka.keytab files (required only if the Kafka cluster has Kerberos authentication enabled)

Access methods for Kerberos-enabled Kafka clusters

EMR supports creating Kafka clusters with Kerberos authentication enabled. The configuration files you need to provide depend on whether Kerberos authentication is also enabled on the Hadoop cluster.

Hadoop clusterFiles to provideHow authentication works
Kerberos disabledkafka_client_jaas.conf and krb5.conf from the Kafka clusterDirect Kerberos authentication against the Kafka cluster
Kerberos enabledkafka_client_jaas.conf and krb5.conf from the Hadoop clusterKafka cluster is authenticated via cross-realm trust

For cross-realm trust setup, see Configure cross-realm trust.

Prepare the Kerberos configuration files

kafka_client_jaas.conf — use the following template:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/path/to/kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};

To get the keytab file, see Configure MIT Kerberos authentication.

krb5.conf — copy from the /etc/ directory of the Kafka cluster.

Use Spark Streaming to access a Kerberos-enabled Kafka cluster

Step 1: Update the hosts file

Add the long domain name and IP address of each Kafka cluster node to /etc/hosts on every Hadoop cluster node. Long domain names follow the format emr-xxx-x.cluster-xxx. Find them in the /etc/hosts file of the corresponding Kafka node.

Step 2: Submit the Spark Streaming job

Pass the kafka_client_jaas.conf, kafka.keytab, and krb5.conf files to the spark-submit command using --files, and reference them in the JVM options:

spark-submit \
  --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" \
  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" \
  --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf \
  --class xx.xx.xx.KafkaSample \
  --num-executors 2 \
  --executor-cores 2 \
  --executor-memory 1g \
  --master yarn-cluster \
  xxx.jar arg1 arg2 arg3
Important

When you distribute the keytab file via --files, Spark copies it to the executor working directory. If keyTab in kafka_client_jaas.conf is an absolute path (as shown in the template above), the executor cannot locate the file and the job fails. Use a relative path instead:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};

Use Spark SQL to access Kafka

Start spark-sql with the EMR SDK jars that include the loghub data source type for Kafka:

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
Note

If your EMR cluster uses Spark 2, replace spark3 with spark2 in the path.

Create a table mapped to a Kafka topic and query it:

CREATE TABLE test_kafka
USING loghub
OPTIONS (
  kafka.bootstrap.servers = 'alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
  subscribe = 'test_topic',
  startingoffsets = 'earliest'
);

SELECT * FROM test_kafka;

Reference