全部产品
Search
文档中心

实时计算Flink版:Upsert Kafka

更新时间:Feb 06, 2024

本文为您介绍如何使用Upsert Kafka连接器。

背景信息

Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。

  • 作为源表,此连接器可以将Kafka中存储的数据转换为changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key,如果不存在相应的key,则该更新被视为INSERT。用表来类比,changelog流中的数据记录被解释为UPSERT,也称为INSERT或UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。

  • 作为结果表,此连接器可以消费上游计算逻辑产生的changelog流。它会将INSERT或UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。

类别

详情

支持类型

源表和结果表

运行模式

流模式和批模式

数据格式

avro、avro-confluent、csv、json和raw

特有监控指标

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • 结果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

API种类

SQL

是否支持更新或删除结果表数据

前提条件

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列Kafka连接器。

  • 仅支持读取和写入Apache Kafka 0.10及以上版本的数据。

  • 仅支持Apache Kafka 2.8版本的客户端配置项,详情请参见Apache Kafka消费者生产者配置项文档。

  • Upsert Kafka结果表在使用精确一次语义时,写入的Kafka集群必须开启事务功能,且仅支持Apache Kafka 0.11及以上版本的集群。

语法结构

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为upsert-kafka。

    properties.bootstrap.servers

    Kafka broker地址。

    String

    格式为host:port,host:port,host:port,以英文逗号(,)分割。

    properties.*

    对Kafka客户端的直接配置。

    String

    后缀名必须是Kafka官方文档中定义的生产者消费者配置。

    Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过'properties.allow.auto.create.topics' = 'false' 来禁用自动创建topic。

    不能通过该方式修改以下配置,因为它们会被Kafka连接器覆盖:

    • key.deserializer

    • value.deserializer

    key.format

    读取或写入Kafka消息key部分时使用的格式。

    String

    当使用该配置时,key.fieldskey.fields-prefix配置是必填的。

    参数取值如下:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    为所有Kafka消息key部分指定自定义前缀,以避免与消息value部分格式字段重名。

    String

    该配置项仅用于源表和结果表的列名区分,解析和生成Kafka消息key部分时,该前缀会被移除。

    说明

    使用该配置时,value.fields-include必须配置为EXCEPT_KEY。

    value.format

    读取或写入Kafka消息value部分时使用的格式。

    String

    该配置等同于format,因此formatvalue.format 只能配置其中一个,如果同时配置两个会产生冲突。

    value.fields-include

    在解析或生成Kafka消息value部分时,是否要包含消息key部分对应的字段。

    String

    ALL

    参数取值如下:

    • ALL(默认值):所有列都会作为Kafka消息value部分处理。

    • EXCEPT_KEY:除去key.fields定义的字段,剩余字段作为Kafka消息value部分处理。

    topic

    读取或写入topic名称。

    String

    无。

  • 结果表

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sink.parallelism

    Kafka结果表算子的并发数。

    Integer

    上游算子的并发,由框架决定

    无。

    sink.buffer-flush.max-rows

    缓存刷新前,最多能缓存多少条记录。

    Integer

    0(未开启)

    当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。

    说明

    如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个选项为大于零的值。

    sink.buffer-flush.interval

    缓存刷新的间隔时间。

    Duration

    0(未开启)

    单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如'sink.buffer-flush.interval'='1 s'

    当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。

    说明

    如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个选项为大于零的值。

使用示例

  • 源表

    创建Kafka数据源表,源表中包含网站用户的浏览数据。

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • 结果表

    • 创建Upsert Kafka结果表。

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • 将统计网站用户的浏览数据写入结果表中。

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;

最佳实践