本文为您介绍如何使用Upsert Kafka连接器。
背景信息
Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。
作为源表,此连接器可以将Kafka中存储的数据转换为changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key,如果不存在相应的key,则该更新被视为INSERT。用表来类比,changelog流中的数据记录被解释为UPSERT,也称为INSERT或UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。
作为结果表,此连接器可以消费上游计算逻辑产生的changelog流。它会将INSERT或UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。
类别 | 详情 |
支持类型 | 源表和结果表 |
运行模式 | 流模式 |
数据格式 | avro、avro-confluent、csv、json和raw |
特有监控指标 |
|
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
您需要创建Kafka集群,详情请参见创建DataFlow Kafka集群或在Kafka创建资源。
您需要连接实时计算Flink与Kafka集群之间网络。Kafka on EMR可参见文档配置创建和管理专有网络和安全组概述,云消息队列 Kafka 版需要配置白名单。
使用限制
语法结构
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为upsert-kafka。
properties.bootstrap.servers
Kafka broker地址。
String
是
无
格式为
host:port,host:port,host:port
,以英文逗号(,)分割。properties.*
对Kafka客户端的直接配置。
String
否
无
Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过
'properties.allow.auto.create.topics' = 'false'
来禁用自动创建topic。不能通过该方式修改以下配置,因为它们会被Kafka连接器覆盖:
key.deserializer
value.deserializer
key.format
读取或写入Kafka消息key部分时使用的格式。
String
是
无
当使用该配置时,key.fields或key.fields-prefix配置是必填的。
参数取值如下:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
key.fields-prefix
为所有Kafka消息key部分指定自定义前缀,以避免与消息value部分格式字段重名。
String
否
无
该配置项仅用于源表和结果表的列名区分,解析和生成Kafka消息key部分时,该前缀会被移除。
说明使用该配置时,value.fields-include必须配置为EXCEPT_KEY。
value.format
读取或写入Kafka消息value部分时使用的格式。
String
是
无
该配置等同于format,因此format和value.format 只能配置其中一个,如果同时配置两个会产生冲突。
value.fields-include
在解析或生成Kafka消息value部分时,是否要包含消息key部分对应的字段。
String
是
ALL
参数取值如下:
ALL(默认值):所有列都会作为Kafka消息value部分处理。
EXCEPT_KEY:除去key.fields定义的字段,剩余字段作为Kafka消息value部分处理。
topic
读取或写入topic名称。
String
是
无
无。
结果表
参数
说明
数据类型
是否必填
默认值
备注
sink.parallelism
Kafka结果表算子的并发数。
Integer
否
上游算子的并发,由框架决定
无。
sink.buffer-flush.max-rows
缓存刷新前,最多能缓存多少条记录。
Integer
否
0(未开启)
当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。
说明如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rows和sink.buffer-flush.interval两个选项为大于零的值。
sink.buffer-flush.interval
缓存刷新的间隔时间。
Duration
否
0(未开启)
单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如
'sink.buffer-flush.interval'='1 s'
。当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。
说明如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rows和sink.buffer-flush.interval两个选项为大于零的值。
使用示例
源表
创建Kafka数据源表,源表中包含网站用户的浏览数据。
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );
结果表
创建Upsert Kafka结果表。
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );
将统计网站用户的浏览数据写入结果表中。
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;