This topic describes how to use Spark SQL to perform data analysis and interactive development on a Kafka data source.

CREATE TABLE syntax

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING kafka
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

Parameters

Parameter Description Required
subscribe The name of the associated Kafka topic. Yes
kafka.bootstrap.servers The connection address of the Kafka cluster. Yes

Enable SASL authentication for Kafka

  1. Create a Simple Authentication and Security Layer (SASL) user and grant permissions to the user. For more information, see Grant permissions to SASL users.
    Notice Spark 2 does not allow you to specify a group ID in a Kafka data source. When a streaming job consumes Kafka data, a group ID prefixed with spark-kafka-source is automatically generated. Therefore, you must grant permissions on the group to the SASL user. If you do not grant permissions on the group to the SASL user, an error message org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-58002817-a288-4b72-bc60-0b109506f683-1465208736-driver-2 is returned.
  2. Configure the required parameters in the Streaming SQL statement to specify the authentication method. The following table describes the parameters.
    Parameter Description
    kafka.bootstrap.servers Set this parameter to a host and port pair in the xxx:9094 format. If you specify multiple host and port pairs, separate them with commas (,).

    Example: alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094.

    kafka.sasl.mechanism Set the value to PLAIN.
    kafka.security.protocol Set the value to SASL_PLAINTEXT.
    kafka.sasl.jaas.config Set this parameter to a value in the org.apache.kafka.common.security.plain.PlainLoginModule required username="xx" password="xx"; format.

    Set username and password to the username and password that you specified when you create the SASL user on the Kafka server.

  3. Create a table.
    CREATE TEMPORARY TABLE example
     USING kafka
     OPTIONS(
      `subscribe` 'topic',
      `serialization.format` '1',
      `kafka.sasl.mechanism` 'PLAIN',
      `kafka.security.protocol` 'SASL_PLAINTEXT',
      `kafka.sasl.jaas.config` 'org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test";',
      `kafka.bootstrap.servers` 'alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094',
     );

References

For more information about Kafka data sources, see Structured Streaming + Kafka Integration Guide.