This topic describes how to use Spark Streaming SQL to perform data analysis and interactive development on a Kafka data source.
CREATE TABLE syntax
CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING kafka
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);
Parameters
Parameter | Description | Required |
---|---|---|
subscribe | The name of the associated Kafka topic. | Yes |
kafka.bootstrap.servers | The connection address of the Kafka cluster. | Yes |
Enable SASL authentication for Kafka
- Create a Simple Authentication and Security Layer (SASL) user and grant permissions to the user. For more information, see Grant permissions to SASL users. Important Spark 2 does not allow you to specify a group ID in a Kafka data source. When a streaming job consumes Kafka data, a group ID prefixed with spark-kafka-source is automatically generated. Therefore, you must grant permissions on the group to the SASL user. If you do not grant permissions on the group to the SASL user, an error message
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-58002817-a288-4b72-bc60-0b109506f683-1465208736-driver-2
is returned. - Configure the required parameters in the Streaming SQL statement to specify the authentication method. The following table describes the parameters.
Parameter Description kafka.bootstrap.servers Set this parameter to a host and port pair in the xxx:9094
format. If you specify multiple host and port pairs, separate them with commas (,).Example: alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094.
kafka.sasl.mechanism Set the value to PLAIN. kafka.security.protocol Set the value to SASL_PLAINTEXT. kafka.sasl.jaas.config Set this parameter to a value in the org.apache.kafka.common.security.plain.PlainLoginModule required username="xx" password="xx";
format.Set username and password to the username and password that you specified when you create the SASL user on the Kafka server.
- Create a table.
CREATE TEMPORARY TABLE example USING kafka OPTIONS( `subscribe` 'topic', `serialization.format` '1', `kafka.sasl.mechanism` 'PLAIN', `kafka.security.protocol` 'SASL_PLAINTEXT', `kafka.sasl.jaas.config` 'org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test";', `kafka.bootstrap.servers` 'alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094', );
Consumption latency monitoring of Kafka offsets
Structured Streaming SQL is used to consume Kafka offsets. The currently consumed offset is not committed to the Kafka server. For more information, see Structured Streaming + Kafka Integration Guide.
After a streaming job is run, a *.prom file supported by Prometheus is generated in the /mnt/disk1/log/spark-streaming directory. One file is generated for each application ID, and the file name is in the application_name -applicationId
format. When the Streaming job is complete or the kill -15
command is run, the file is automatically deleted. The following metrics are provided to monitor the latency of Kafka offsets.
Category | Metric | Description |
---|---|---|
Data volume | num_input_rows | The input data volume of each batch. |
input_rows_per_second | The input rate of each batch. | |
Processing rate | processed_rows_per_second | The number of rows processed in each batch. |
Calculation latency | durationMs_triggerExecution | The total processing duration of each batch. |
durationMs_setOffsetRange | The time period that is required to define the range of offsets in each batch. | |
durationMs_wal_commit | The time period that is required to write offsets of each batch to the log of a checkpoint. | |
durationMs_getEndOffset | The time period that is required to obtain the end offset of each batch. | |
durationMs_query_planning | The time that is required to perform incremental queries in each batch. | |
durationMs_get_batch | The time period that is required to obtain the data of a source in each batch. | |
durationMs_add_batch | The time period that is required to write data to a sink in each batch. | |
Offset latency | lag | The difference between the most recent offset that is consumed by the current streaming job of each batch and the most recent offset of a Kafka job. The difference is calculated based on the same partition in the same topic. |
streaming_endoffset | The most recent offset that is consumed by the current streaming job. | |
kafka_endoffset | The most recent offset of a Kafka job. |