This topic describes how to use Spark Streaming SQL to perform data analysis and interactive development on a Kafka data source.

CREATE TABLE syntax

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING kafka
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

Parameters

ParameterDescriptionRequired
subscribeThe name of the associated Kafka topic. Yes
kafka.bootstrap.serversThe connection address of the Kafka cluster. Yes

Enable SASL authentication for Kafka

  1. Create a Simple Authentication and Security Layer (SASL) user and grant permissions to the user. For more information, see Grant permissions to SASL users.
    Important Spark 2 does not allow you to specify a group ID in a Kafka data source. When a streaming job consumes Kafka data, a group ID prefixed with spark-kafka-source is automatically generated. Therefore, you must grant permissions on the group to the SASL user. If you do not grant permissions on the group to the SASL user, an error message org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-58002817-a288-4b72-bc60-0b109506f683-1465208736-driver-2 is returned.
    group
  2. Configure the required parameters in the Streaming SQL statement to specify the authentication method. The following table describes the parameters.
    ParameterDescription
    kafka.bootstrap.serversSet this parameter to a host and port pair in the xxx:9094 format. If you specify multiple host and port pairs, separate them with commas (,).

    Example: alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094.

    kafka.sasl.mechanismSet the value to PLAIN.
    kafka.security.protocolSet the value to SASL_PLAINTEXT.
    kafka.sasl.jaas.configSet this parameter to a value in the org.apache.kafka.common.security.plain.PlainLoginModule required username="xx" password="xx"; format.

    Set username and password to the username and password that you specified when you create the SASL user on the Kafka server.

  3. Create a table.
    CREATE TEMPORARY TABLE example
     USING kafka
     OPTIONS(
      `subscribe` 'topic',
      `serialization.format` '1',
      `kafka.sasl.mechanism` 'PLAIN',
      `kafka.security.protocol` 'SASL_PLAINTEXT',
      `kafka.sasl.jaas.config` 'org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test";',
      `kafka.bootstrap.servers` 'alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094',
     );

Consumption latency monitoring of Kafka offsets

Structured Streaming SQL is used to consume Kafka offsets. The currently consumed offset is not committed to the Kafka server. For more information, see Structured Streaming + Kafka Integration Guide.

After a streaming job is run, a *.prom file supported by Prometheus is generated in the /mnt/disk1/log/spark-streaming directory. One file is generated for each application ID, and the file name is in the application_name -applicationId format. When the Streaming job is complete or the kill -15 command is run, the file is automatically deleted. The following metrics are provided to monitor the latency of Kafka offsets.

CategoryMetricDescription
Data volumenum_input_rowsThe input data volume of each batch.
input_rows_per_secondThe input rate of each batch.
Processing rateprocessed_rows_per_secondThe number of rows processed in each batch.
Calculation latencydurationMs_triggerExecutionThe total processing duration of each batch.
durationMs_setOffsetRangeThe time period that is required to define the range of offsets in each batch.
durationMs_wal_commitThe time period that is required to write offsets of each batch to the log of a checkpoint.
durationMs_getEndOffsetThe time period that is required to obtain the end offset of each batch.
durationMs_query_planningThe time that is required to perform incremental queries in each batch.
durationMs_get_batchThe time period that is required to obtain the data of a source in each batch.
durationMs_add_batchThe time period that is required to write data to a sink in each batch.
Offset latencylagThe difference between the most recent offset that is consumed by the current streaming job of each batch and the most recent offset of a Kafka job. The difference is calculated based on the same partition in the same topic.
streaming_endoffsetThe most recent offset that is consumed by the current streaming job.
kafka_endoffsetThe most recent offset of a Kafka job.