本文介绍如何使用Kafka数据源进行数据分析或者交互式开发。
建表语法
CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING kafka
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);
配置参数说明
参数 | 描述 | 是否必选 |
---|---|---|
subscribe | 关联的Kafka Topic名称。 | 是 |
kafka.bootstrap.servers | Kafka集群连接地址。 | 是 |
Kafka开启SASL认证
- 开通SASL权限,具体操作请参见SASL用户授权。
重要 Spark2 Kafka DataSource中不支持设置Kafka的group.id,streaming在消费Kafka数据的时候,每个Streaming作业会自动生成以"spark-kafka-source"为前缀的group.id。因此需要在Kafka服务端开通对该Group的权限,具体如下截图,否则会报类似错误
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-58002817-a288-4b72-bc60-0b109506f683-1465208736-driver-2
。 - 在Streaming SQL中添加以下参数来配置安全认证方式。
参数 描述 kafka.bootstrap.servers 参数值格式为 xxx:9094
。例如,alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094。
kafka.sasl.mechanism 固定值为PLAIN。 kafka.security.protocol 固定值为SASL_PLAINTEXT。 kafka.sasl.jaas.config 参数值格式为 org.apache.kafka.common.security.plain.PlainLoginModule required username="xx" password="xx";
其中,username和password是在Kafka服务端开通sasl用户填写的用户名和密码。
- 建表示例。
CREATE TEMPORARY TABLE example USING kafka OPTIONS( `subscribe` 'topic', `serialization.format` '1', `kafka.sasl.mechanism` 'PLAIN', `kafka.security.protocol` 'SASL_PLAINTEXT', `kafka.sasl.jaas.config` 'org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test";', `kafka.bootstrap.servers` 'alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094', );
Kafka offset消费延迟监控
由于Streaming SQL底层使用Structured Streaming SQL消费Kafka作业,默认情况下不会提交当前消费的offset到Kafka Server上,详情请参见Structured Streaming + Kafka Integration Guide。
Streaming作业运行之后,会自动在/mnt/disk1/log/spark-streaming下生成prometheus支持的*.prom文件,每个applicationid生成一个,文件名称格式是application_name -applicationId
,Streaming作业完成或者执行kill -15
命令的时候会自动删除该文件。您可以通过以下指标监控当前作业消费的进度延迟情况。
指标分类 | 指标名称 | 指标说明 |
---|---|---|
数据量 | num_input_rows | 每个batch输入数据量。 |
input_rows_per_second | 每个batch输入速率。 | |
处理速率 | processed_rows_per_second | 每个batch处理的行数。 |
计算延迟指标 | durationMs_triggerExecution | 每个batch总的处理时长。 |
durationMs_setOffsetRange | 每个batch要计算的offset范围处理时长。 | |
durationMs_wal_commit | 每个batch写入offset到checkpoint日志处理时长。 | |
durationMs_getEndOffset | 每个batch获取endoffset的处理时长。 | |
durationMs_query_planning | 每个batch增量查询计划处理时间。 | |
durationMs_get_batch | 每个batch获取source端数据处理时间。 | |
durationMs_add_batch | 每个batch写入sink端处理时间。 | |
offset延迟指标 | lag | 每个batch当前streaming的已经消费的最新endoffset和kafka当前的latest offset做差值,指标粒度是topic+partition。 |
streaming_endoffset | 当前streaming的消费的最新endoffset。 | |
kafka_endoffset | kafka当前的latest offset。 |