全部产品
Search
文档中心

实时计算Flink版:消息队列Kafka

更新时间:Sep 12, 2024

本文为您介绍如何使用消息队列Kafka连接器。

背景信息

Apache Kafka是一款开源的分布式消息队列系统,广泛用于高性能数据处理、流式分析、数据集成等大数据领域。Kafka连接器基于开源Apache Kafka客户端,为阿里云实时计算Flink提供高性能的数据吞吐、多种数据格式的读写和精确一次语义的支持。

类别

详情

支持类型

源表和结果表

运行模式

流模式

数据格式

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

说明
  • 仅支持VVR 8.0.9及以上版本使用内置的Protobuf数据格式。

  • 以上支持的数据格式都有其对应的配置项,可直接在WITH参数中使用,详情请参见Flink社区文档

特有监控指标

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • 结果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

API种类

SQL和Datastream

是否支持更新或删除结果表数据

不支持更新和删除结果表数据,只支持插入数据。

说明

如果您需要更新和删除结果表数据,相关功能请参见Upsert Kafka

前提条件

您可以根据需求选择以下任意一种方式连接集群:

  • 连接阿里云云消息队列Kafka版集群

    • 云消息队列 Kafka 版集群已创建。详情请参见创建资源

    • Flink工作空间与Kafka集群处于同一VPC内,且云消息队列 Kafka 版已对Flink开放白名单,具体操作请参见配置白名单

    重要

    写入阿里云Kafka的限制:

    • 阿里云Kafka不支持zstd压缩格式写入。

    • 阿里云Kafka不支持幂等和事务写入,无法使用Kafka结果表提供的精确一次语义exactly-once semantic功能。在使用实时计算引擎VVR 8.0.0及以上时,需要在结果表中添加配置项properties.enable.idempotence=false以关闭幂等写入功能。阿里云Kafka的存储引擎对比与功能限制参见存储引擎对比

  • 连接自建Apache Kafka集群

    • 自建Apache Kafka集群版本在0.11及以上。

    • Flink与自建Apache Kafka集群之间的网络已打通。如何通过公网连接自建集群,详情请参见Flink全托管集群如何访问公网?

    • 仅支持Apache Kafka 2.8版本的客户端配置项,详情请参见Apache Kafka消费者生产者配置项文档。

使用限制

CREATE TABLE AS(CTAS)的使用限制

  • 仅Flink计算引擎vvr-4.0.12-flink-1.13及以上版本支持Kafka作为CREATE TABLE AS(CTAS)的同步数据源。

  • 仅支持JSON格式的类型推导和schema变更,其它数据格式暂不支持。

  • 仅支持Kafka中value部分的类型推导和表结构变更。如果您需要同步Kafka key部分的列,则需要您手动在DDL中进行指定。详情请参见示例三

网络连接排查

如果您的Flink作业在启动时出现Timed out waiting for a node assignment错误,一般是Flink和Kafka之间的网络连通问题导致的。

Kafka客户端与服务端建立连接的过程如下所示。

  1. 客户端使用您指定的bootstrap.servers地址连接Kafka服务端,Kafka服务端根据配置向客户端返回集群中各台broker的元信息,包括各台broker的连接地址。

  2. 客户端使用第一步broker返回的连接地址连接各台broker进行读取或写入。

如果Kafka服务端没有正确配置,客户端在第一步收到的连接地址有误,即使bootstrap.servers配置的地址可以连接上,也无法正常读取或写入数据。该问题经常在Flink与Kafka之间存在代理、端口转发、专线等网络转发机制时发生。

您可以按照以下步骤检查Kafka集群是否配置正确。

  1. 使用Zookeeper命令行工具(zkCli.sh或zookeeper-shell.sh)登录到您Kafka所使用的Zookeeper集群。

  2. 根据您的集群实际情况执行正确的命令,来获取您的Kafka broker元信息。通常可以使用get /brokers/ids/0命令来获取Kafka broker元信息。Kafka broker的连接地址位于endpoints字段中,该地址即为上述连接过程中服务端向客户端返回的连接地址,信息如下图所示。example

  3. 使用ping或telnet等命令来测试endpoint中显示的地址与Flink的连通性。如果无法连通该地址,请联系您的Kafka运维修改Kafka配置,为Flink单独配置listeners和advertised.listeners。

说明

更多关于Kafka客户端与服务端的连接信息,请参见Troubleshoot Connectivity

语法结构

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

元信息列

您可以在源表和结果表中定义元信息列,以获取或写入Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据。元信息列的使用示例如下。

CREATE TABLE kafka_source (
  --读取消息所属的topic作为`record_topic`字段
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  --读取ConsumerRecord中的时间戳作为`ts`字段
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  --读取消息的offset作为`record_offset`字段
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  --将`ts`字段中的时间戳作为ProducerRecord的时间戳写入Kafka
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

下表列出了Kafka源表和结果表所支持的元信息列。

Key

数据类型

说明

源表或结果表

topic

STRING NOT NULL METADATA VIRTUAL

Kafka消息所在的Topic名称。

源表

partition

INT NOT NULL METADATA VIRTUAL

Kafka消息所在的Partition ID。

源表

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

Kafka消息的消息头(header)。

源表和结果表

leader-epoch

INT NOT NULL METADATA VIRTUAL

Kafka消息的Leader epoch。

源表

offset

BIGINT NOT NULL METADATA VIRTUAL

Kafka消息的偏移量(offset)。

源表

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

Kafka消息的时间戳。

源表和结果表

timestamp-type

STRING NOT NULL METADATA VIRTUAL

Kafka消息的时间戳类型:

  • NoTimestampType:消息中没有定义时间戳。

  • CreateTime:消息产生的时间。

  • LogAppendTime:消息被添加到Kafka Broker的时间。

源表

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为kafka。

    properties.bootstrap.servers

    Kafka broker地址。

    String

    格式为host:port,host:port,host:port,以英文逗号(,)分割。

    properties.*

    对Kafka客户端的直接配置。

    String

    后缀名必须是Kafka官方文档中定义的生产者消费者配置。

    Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过'properties.allow.auto.create.topics'='false'来禁用自动创建topic。

    不能通过该方式修改以下配置,因为它们会被Kafka连接器覆盖:

    • key.deserializer

    • value.deserializer

    format

    读取或写入Kafka消息value部分时使用的格式。

    String

    支持的格式

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    说明

    更多format参数设置请参见Format参数

    key.format

    读取或写入Kafka消息key部分时使用的格式。

    String

    支持的格式

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    说明

    使用该配置时,key.options配置是必填的。

    key.fields

    Kafka消息key部分对应的源表或结果表字段。

    String

    多个字段名以分号(;)分隔。例如field1;field2

    key.fields-prefix

    为所有Kafka消息key部分指定自定义前缀,以避免与消息value部分格式字段重名。

    String

    该配置项仅用于源表和结果表的列名区分,解析和生成Kafka消息key部分时,该前缀会被移除。

    说明

    使用该配置时,value.fields-include必须配置为EXCEPT_KEY。

    value.format

    读取或写入Kafka消息value部分时使用的格式。

    String

    该配置等同于format,因此formatvalue.format 只能配置其中一个,如果同时配置两个会产生冲突

    value.fields-include

    在解析或生成Kafka消息value部分时,是否要包含消息key部分对应的字段。

    String

    ALL

    参数取值如下:

    • ALL(默认值):所有列都会作为Kafka消息value部分处理

    • EXCEPT_KEY:除去key.fields定义的字段,剩余字段作为Kafka消息value部分处理

  • 源表

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    topic

    读取的topic名称。

    String

    以英文分号 (;) 分隔多个topic名称,例如topic-1和topic-2

    说明

    topic和topic-pattern两个选项只能指定其中一个。

    topic-pattern

    匹配读取topic名称的正则表达式。所有匹配该正则表达式的topic在作业运行时均会被读取。

    String

    说明
    • 仅VVR 3.0.0及以上版本支持该参数。

    • topic和topic-pattern两个选项只能指定其中一个。

    properties.group.id

    消费组ID。

    String

    KafkaSource-{源表表名}

    如果指定的group id为首次使用,则必须将properties.auto.offset.reset设置为earliest或latest以指定首次启动位点。

    scan.startup.mode

    Kafka读取数据的启动位点。

    String

    group-offsets

    取值如下:

    • earliest-offset:从Kafka最早分区开始读取。

    • latest-offset:从Kafka最新位点开始读取。

    • group-offsets(默认值):从指定的properties.group.id已提交的位点开始读取。

    • timestamp:从scan.startup.timestamp-millis指定的时间戳开始读取。

    • specific-offsets:从scan.startup.specific-offsets指定的偏移量开始读取。

    说明

    该参数在作业无状态启动时生效。作业在从checkpoint重启或状态恢复时,会优先使用状态中保存的进度恢复读取。

    scan.startup.specific-offsets

    specific-offsets启动模式下,指定每个分区的启动偏移量。

    String

    例如partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    timestamp启动模式下,指定启动位点时间戳。

    Long

    单位为毫秒

    scan.topic-partition-discovery.interval

    动态检测Kafka topic和partition的时间间隔。

    Duration

    5分钟

    分区检查间隔默认为5分钟。需要显式地设置分区检查间隔为非正数才能关闭此功能。开启动态分区发现后,Kafka Source 可以自动地发现新增的分区并自动读取对应分区上的数据。在topic-pattern模式下,不仅读取已有topic的新增分区数据,也会读取符合正则匹配的新增topic的所有分区数据。

    说明

    在实时计算引擎VVR 6.0.x版本中,动态分区检测默认为关闭。自8.0版本起该功能默认打开,检测间隔默认设置为5分钟。

    scan.header-filter

    根据Kafka数据是否包含指定的消息头(Header)对数据进行条件过滤。

    String

    Header key和value使用冒号(:)分隔,多个header条件之间使用逻辑运算符(&、|)连接,支持取反逻辑运算符(!)。例如depart:toy|depart:book&!env:test表示保留header中包含depart=toy或depart=book,且不包含env=test的Kafka数据。

    说明
    • 仅实时计算引擎VVR 8.0.6及以上版本支持配置该参数。

    • 暂不支持括号运算。

    • 逻辑运算顺序为从左至右。

    • Header value会以UTF-8格式转换为字符串,与参数指定的header value进行比较。

    scan.check.duplicated.group.id

    是否检查通过properties.group.id指定的消费者组有重复。

    Boolean

    false

    参数取值如下:

    • true:在启动作业前检查消费者组是否有重复,如有重复作业将会报错,避免与现有的消费者组产生冲突。

    • false:直接启动作业,不检查消费者组冲突。

    说明

    仅VVR 6.0.4及以上版本支持该参数。

  • 结果表

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    topic

    写入的topic名称。

    String

    sink.partitioner

    从Flink并发到Kafka分区的映射模式。

    String

    default

    取值如下:

    • default(默认值):使用Kafka默认的分区模式

    • fixed:每个Flink并发对应一个固定的Kafka分区。

    • round-robin:Flink并发中的数据将被轮流分配至Kafka的各个分区。

    • 自定义分区映射模式:如果fixed和round-robin不满足您的需求,您可以创建一个FlinkKafkaPartitioner的子类来自定义分区映射模式。例如org.mycompany.MyPartitioner

    sink.delivery-guarantee

    Kafka结果表的语义模式。

    String

    at-least-once

    取值如下:

    • none:不保证任何语义,数据可能会丢失或重复。

    • at-least-once(默认值):保证数据不丢失,但可能会重复。

    • exactly-once:使用Kafka事务保证数据不会丢失和重复。

    说明

    在使用exactly-once语义时,sink.transactional-id-prefix是必填的。

    sink.transactional-id-prefix

    在exactly-once语义下使用的Kafka事务ID前缀。

    String

    只有sink.delivery-guarantee配置为exactly-once时该配置才会生效。

    sink.parallelism

    Kafka结果表算子的并发数。

    Integer

    上游算子的并发,由框架决定。

  • CTAS同步数据源

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    json.infer-schema.flatten-nested-columns.enable

    是否递归式地展开JSON中的嵌套列。

    Boolean

    false

    参数取值如下:

    • true:递归式展开。对于被展开的列,Flink使用索引该值的路径作为名字。例如,对于JSON {"nested": {"col": true}} 中的列col,它展开后的名字为nested.col。

    • false(默认值):将嵌套类型当作String处理。

    json.infer-schema.primitive-as-string

    是否推导所有基本类型为String类型。

    Boolean

    false

    参数取值如下:

    • true:推导所有基本类型为String。

    • false:按照基本规则进行推导。

    所有Kafka consumer和producer支持的配置项均可在配置前添加properties.前缀后在WITH参数中使用。例如需要配置Kafka consumer或producer的超时时间request.timeout.ms为60000毫秒,则可以在WITH参数中配置'properties.request.timeout.ms'='60000'。Kafka consumer和Kafka producer的配置项详情请参见Apache Kafka官方文档

安全与认证

如果您的Kafka集群要求安全连接或认证,请将相关的安全与认证配置添加properties.前缀后设置在WITH参数中。配置Kafka表以使用PLAIN作为SASL机制,并提供JAAS配置的示例如下。

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

使用SASL_SSL作为安全协议,并使用SCRAM-SHA-256作为SASL机制的示例如下。

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*SSL配置*/
  /*配置服务端提供的truststore (CA 证书) 的路径*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*如果要求客户端认证,则需要配置keystore (私钥) 的路径*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*客户端验证服务器地址的算法,空值表示禁用服务器地址验证*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*SASL配置*/
  /*将SASL机制配置为as SCRAM-SHA-256*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*配置JAAS*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

示例中提到的CA证书和私钥可使用实时计算控制台的资源上传功能上传至平台,上传后文件存放在/flink/usrlib目录下。例如,需要使用的CA证书文件名为my-truststore.jks,则上传后您可以在WITH参数中指定'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks'来使用该证书。

说明
  • 上文中的示例仅适用于大多数配置情况。在配置Kafka连接器前,请与您的Kafka服务端运维人员联系,以获取正确的安全与认证配置信息。

  • 与开源Flink不同,实时计算Flink版的SQL编辑器默认已经对双引号(")进行转义处理,因此您在配置properties.sasl.jaas.config时无需对用户名和密码中的双引号(")添加额外的转义符(\)。

源表起始位点

启动模式

Kafka源表可通过配置scan.startup.mode来指定初始读取位点:

  • 最早位点(earliest-offset):从当前分区的最早位点开始读取。

  • 最末尾位点(latest-offset):从当前分区的最末尾位点开始读取。

  • 已提交位点(group-offsets):从指定group id的已提交位点开始读取,group id通过properties.group.id指定。

  • 指定时间戳(timestamp):从时间戳大于等于指定时间的第一条消息开始读取,时间戳通过scan.startup.timestamp-millis指定。

  • 特定位点(specific-offsets):从您指定的分区位点开始消费,位点通过scan.startup.specific-offsets指定。

说明
  • 如果您不指定启动位点,则默认会从已提交位点(group-offsets)启动消费。

  • scan.startup.mode只针对无状态启动的作业生效,有状态作业启动时会从状态中存储的位点开始消费。

代码示例如下:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  --从最早位点开始消费
  'scan.startup.mode' = 'earliest-offset',
  --从最末尾位点开始消费
  'scan.startup.mode' = 'latest-offset',
  --从消费者组"my-group"的已提交位点开始消费
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- 如果 "my-group" 为首次使用,则从最早位点开始消费
  'properties.auto.offset.reset' = 'latest', -- 如果 "my-group" 为首次使用,则从最末尾位点开始消费
  --从指定的毫秒时间戳1655395200000开始消费
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  --从指定位点开始消费
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

起始位点优先级

源表起始位点的优先级为:

  1. Checkpoint或Savepoint中存储的位点。

  2. 您在实时计算控制台指定的启动时间。

  3. 您在WITH参数中通过scan.startup.mode指定的启动位点。

  4. 未指定scan.startup.mode的情况下使用group-offsets

在以上任何一个步骤中,如果位点过期或Kafka集群发生问题等原因导致位点无效,则会使用properties.auto.offset.reset指定的策略进行位点重置,如果您未设置该配置项,则会产生异常要求您介入。

一种常见情况是使用全新的group id开始消费。首先源表会向Kafka集群查询该group的已提交位点,由于该group id是第一次使用,不会查询到有效位点,所以会通过properties.auto.offset.reset参数配置的策略进行重置。因此在使用全新group id进行消费时,必须配置properties.auto.offset.reset来指定位点重置策略。

源表位点提交

Kafka源表只在checkpoint成功后将当前消费位点提交至Kafka集群。如果您的checkpoint间隔设置较长,您在Kafka集群侧观察到的消费位点会有延迟。在进行checkpoint时,Kafka源表会将当前读取进度存储在状态中,并不依赖于提交到集群上的位点进行故障恢复,提交位点仅仅是为了在Kafka侧能够监控到读取进度,位点提交失败不会对数据正确性产生任何影响。

结果表自定义分区器

如果内置的Kafka Producer分区模式无法满足您的需求,您可以实现自定义分区模式将数据写入对应的分区。自定义分区器需要继承FlinkKafkaPartitioner,开发完成后编译JAR包,使用资源上传功能上传至实时计算控制台。上传并引用完成后,请在WITH参数中设置sink.partitioner参数,参数值为分区器完整的类路径,如org.mycompany.MyPartitioner

Kafka、Upsert Kafka或Kafka JSON catalog的选择

Kafka是一种只能添加数据的消息队列系统,无法进行数据的更新和删除操作,因此在流式SQL计算中无法处理上游的CDC变更数据和聚合、联合等算子的回撤逻辑。如果您需要将含有变更或回撤类型的数据写入Kafka,请使用对变更数据进行特殊处理的Upsert Kafka结果表。

为了方便您将上游数据库中一个或多个数据表中的变更数据批量同步到Kafka中,您可以使用Kafka JSON catalog。如果您的Kafka中存储的数据格式为JSON,使用Kafka JSON catalog可以省去定义schema和WITH参数的步骤。详情可参见管理Kafka JSON Catalog

作为CTAS数据源

CTAS语句支持将消息队列Kafka,且format为JSON的表作为数据源。在数据同步过程中,如果某些字段并未出现在预定义的表结构中,Flink会尝试自动推导该列的类型。如果自动推导的类型不能满足您的使用需求,您也可以通过辅助推导的方式对某些列的解析类型进行声明。

说明

关于JSON Format的详细描述,详情请参见JSON Format

  • 类型推导

    在类型推导过程中,Flink默认只展开JSON文本中的第一层数据,根据其类型和数值,按照基本规则进行类型推导。类型映射基本规则如下表所示。

    JSON类型

    Flink SQL类型

    BOOLEAN

    BOOLEAN

    STRING

    DATE、TIMESTAMP、TIMESTAMP_LTZ、TIME或 STRING

    INT或LONG

    BIGINT

    BIGINT

    DECIMAL或STRING

    说明

    Flink中DECIMAL的类型存在精度限制。因此,如果整数的实际取值超过了DECIMAL类型最大精度,Flink会自动推导其类型为STRING,避免精度的损失。

    FLOAT、DOUBLE或BIG DECIMAL

    DOUBLE

    ARRAY

    STRING

    OBJECT

    STRING

    示例

    • JSON文本

      {
        "id": 101,
        "name": "VVP",
        "properties": {
          "owner": "阿里云",
          "engine": "Flink"
        }
          "type": ["大数据"]
      }
    • Flink写入到下游存储的表信息为

      id

      name

      properties

      type

      101

      VVP

      {
           "owner": "阿里云",
           "engine": "Flink"
      }

      ["大数据"]

  • 辅助推导

    当基本规则不符合您的实际需要时,您可以在源表的DDL中声明特定列的解析类型。通过该方式,Flink会优先使用您声明的列类型去解析目标字段。针对以下示例,Flink会使用DECIMAL的方式去解析price字段,而不是使用默认的基本规则将其转换为DOUBLE类型。

    CREATE TABLE evolvingKafkaSource (
      price DECIMAL(18, 2)
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'topic' = 'evolving_kafka_demo',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    );

    但是,当您在DDL中指定的类型和实际数据中的类型不一致时,可以按照以下方式进行处理:

    • 在声明的类型比实际类型更宽泛时,以声明的类型自动去解析。例如,声明为DOUBLE,遇到的数据类型为BIGINT,则会以DOUBLE类型去解析。

    • 在实际的类型比声明的类型更为宽泛或者两种类型不兼容时,由于当前CTAS不支持类型变更,因此会报错提示您相关信息,您需要重新启动作业并声明准确的类型去解析数据。

      类型的宽泛的程度以及兼容性如下图所示。zongjie

      说明
      • 上图表示越靠近根节点,其类型越宽泛。如果两个类型在不同的分支上,则表示这两个类型不兼容。

      • 不支持辅助推导复杂类型,包括ROW、ARRAY、MAP和MULTISET。

      • 对于复杂类型,Flink在默认情况下会处理为STRING。

通常,Kafka topic中的JSON文本带有嵌套结构。如果您需要提取JSON文本中的嵌套列,则可以通过以下两种方式:

  • 在源表DDL中声明'json.infer-schema.flatten-nested-columns.enable'='true',来展开嵌套列中的所有元素至顶层。通过该方式,所有的嵌套列都会被依次展开。为了避免列名冲突,Flink采用索引到该列的路径作为展开后列名字。

    重要

    目前不支持解决列名冲突。如果发生列名冲突,请在源表的DDL中声明json.ignore-parse-errors为true,来忽略存在冲突的数据。

  • 在DDL中CTAS语法中添加计算列`rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。详情请参见示例四:同步表结构和数据并进行计算

使用示例

示例一:从Kafka中读取数据后写入Kafka

从名称为源表的Topic中读取Kafka数据,再写入名称为结果表的Topic,数据使用CSV格式。

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

示例二:同步表结构以及数据

将Kafka Topic中的消息实时同步到Hologres中。在该情况下,您可以将Kafka消息的offset和partition id作为主键,从而保证在Failover时,Hologres中不会有重复消息。

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    --可选,将嵌套列全部展开。
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

示例三:同步表结构以及Kafka消息的key和value数据

Kafka消息中的key部分已经存储了相关信息,您可以同时同步Kafka中的key和value。

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
说明

Kafka消息中的key部分不支持表结构变更和类型推导,需要您手动声明。

示例四:同步表结构和数据并进行计算

在同步Kafka数据到Hologres时,往往需要一些轻量级的计算。

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
--使用COALESCE处理空值情况。

Datastream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了Kafka DataStream连接器

  • 构建Kafka Source

    Kafka Source提供了构建类来创建Kafka Source的实例。我们将通过以下示例代码为您介绍如何构建Kafka Source来消费input-topic最早位点的数据,消费组名称为my-group,并将Kafka消息体反序列化为字符串。

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    在构建KafkaSource时,必须指定以下参数。

    参数

    说明

    BootstrapServers

    Kafka Broker地址,通过setBootstrapServers(String)方法配置。

    GroupId

    消费者组ID,通过setGroupId(String)方法配置。

    Topics或Partition

    订阅的Topic或Partition名称。Kafka Source提供了以下三种Topic或Partition的订阅方式:

    • Topic列表,订阅Topic列表中所有Partition。

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • 正则表达式匹配,订阅与正则表达式所匹配的Topic下的所有Partition。

      KafkaSource.builder().setTopicPattern("topic.*")
    • Partition列表,订阅指定的Partition。

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
              new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    解析Kafka消息的反序列化器。

    反序列化器通过setDeserializer(KafkaRecordDeserializationSchema)来指定,其中KafkaRecordDeserializationSchema定义了如何解析Kafka的ConsumerRecord。如果只解析Kafka消息中的消息体(Value)的数据,则您可以通过以下任何一种方式实现:

    • 使用Flink提供的KafkaSource构建类中的setValueOnlyDeserializer(DeserializationSchema)方法,其中DeserializationSchema定义了如何解析Kafka消息体中的二进制数据。

    • 使用Kafka提供的解析器,包括多种实现类。例如,您可以使用StringDeserializer来将Kafka消息体解析成字符串。

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    说明

    如果需要完整地解析ConsumerRecord,则需要您自己实现KafkaRecordDeserializationSchema接口。

    在使用Kafka DataStream连接器时,您还需要了解以下Kafka属性:

    • 起始消费位点

      Kafka Source能够通过位点初始化器(OffsetsInitializer)来指定从不同的偏移量开始消费。内置的位点初始化器包括以下内容。

      位点初始化器

      代码设置

      从最早位点开始消费。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      从最末尾位点开始消费。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      从时间戳大于等于指定时间的数据开始消费,单位为毫秒。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      从消费组提交的位点开始消费,不指定位点重置策略。

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      说明
      • 如果以上内置的初始化器不能满足需求,您也可以自己实现自定义的位点初始化器。

      • 如果您未指定位点初始化器,则默认使用OffsetsInitializer.earliest(),即最早位点。

    • 流模式和批模式

      Kafka Source支持流式和批式两种运行模式。默认情况下,Kafka Source设置为以流模式运行,因此作业永远不会停止,直到Flink作业失败或被取消。如果要配置Kafka Source在批模式下运行,您可以使用setBounded(OffsetsInitializer)指定停止偏移量,当所有分区都达到其停止偏移量时,Kafka Source会退出运行。

      说明

      通常,流模式下Kafka Source没有停止偏移量。为了方便对代码进行调试,流模式下您也可以使用 setUnbounded(OffsetsInitializer) 指定停止偏移量。请留意指定流模式和批模式停止偏移量的方法名(setUnbounded 和 setBounded)是不同的。

    • 动态分区检查

      为了在不重启Flink作业的情况下,处理Topic扩容或新建Topic等场景,您可以在提供的Topic或Partition订阅模式下,启用动态分区检查功能。

      说明

      默认开启动态分区检查功能,分区检查间隔默认为5分钟。需要显式地设置分区检查间隔为非正数才能关闭此功能。代码示例如下。

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // 每10秒检查一次新分区。
    • 事件时间和水印

      Kafka Source默认使用Kafka消息中的时间戳作为事件时间。您可以自定义水印策略(Watermark Strategy)以从消息中提取事件时间,并向下游发送水印。

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      如果您需要了解自定义水印策略(Watermark Strategy),请参见Generating Watermarks

    • 消费位点提交

      Kafka Source在Checkpoint完成时,提交当前的消费位点,以保证Flink的Checkpoint状态和Kafka Broker上的提交位点一致。如果未开启Checkpoint,Kafka Source依赖于Kafka Consumer内部的位点定时自动提交逻辑,自动提交功能由enable.auto.commit和 auto.commit.interval.ms两个Kafka Consumer配置项进行配置。

      说明

      Kafka Source不依赖于Broker上提交的位点来恢复失败的作业。提交位点只是为了上报Kafka Consumer和消费组的消费进度,以在Broker端进行监控。

    • 其他属性

      除了上述属性之外,您还可以使用setProperties(Properties) 和setProperty(String, String) 为Kafka Source和Kafka Consumer设置任意属性。KafkaSource通常有以下配置项。

      配置项

      说明

      client.id.prefix

      指定用于Kafka Consumer的客户端ID前缀。

      partition.discovery.interval.ms

      定义Kafka Source检查新分区的时间间隔。

      说明

      partition.discovery.interval.ms会在批模式下被覆盖为-1。

      register.consumer.metrics

      指定是否在Flink中注册Kafka Consumer的指标。

      其他Kafka Consumer配置

      Kafka Consumer的配置详情,请参见Apache Kafka

      重要

      Kafka Connector会强制覆盖部分您手动配置的参数项,覆盖详情如下:

      • key.deserializer始终被覆盖为ByteArrayDeserializer。

      • value.deserializer始终被覆盖为ByteArrayDeserializer。

      • auto.offset.reset.strategy被覆盖为OffsetsInitializer#getAutoOffsetResetStrategy()。

      以下示例为您展示如何配置Kafka Consumer,以使用PLAIN作为SASL机制并提供JAAS配置。

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • 监控

      Kafka Source在Flink中注册指标,用于监控和诊断。

      • 指标范围

        Kafka source reader的所有指标都注册在KafkaSourceReader指标组下,KafkaSourceReader是operator指标组的子组。与特定主题分区相关的指标注册在KafkaSourceReader.topic.<topic_name>.partition.<partition_id>指标组中。

        例如Topic "my-topic"、分区1的当前消费位点(currentOffset)注册在<some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset。成功提交位点的次数(commitsSucceeded)注册在<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded。

      • 指标列表

        指标名称

        描述

        范围

        currentOffset

        当前消费位点

        TopicPartition

        committedOffset

        当前提交位点

        TopicPartition

        commitsSucceeded

        成功提交的次数

        KafkaSourceReader

        commitsFailed

        失败的提交次数

        KafkaSourceReader

      • Kafka Consumer指标

        Kafka Consumer的指标注册在指标组KafkaSourceReader.KafkaConsumer。例如Kafka Consumer指标records-consumed-total注册在<some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total。

        您可以使用配置项register.consumer.metrics配置是否注册Kafka消费者的指标。默认此选项设置为true。对于Kafka Consumer的指标,您可以参见Apache Kafka

  • 构建Kafka Producer

    Flink Kafka Producer可以实现将流数据写入一个或多个Kafka Topic。

    DataStream<String> stream = ...
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    
    FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
            "my-topic",                  // target topic
            new SimpleStringSchema(),    // serialization schema
            properties,                  // producer config
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
    
    stream.addSink(myProducer);

    您需要配置以下参数。

    参数

    说明

    Topic

    数据写入的默认Topic名称。

    数据序列化

    用于将数据序列化的SerializationSchema或KafkaSerializationSchema。

    Flink Kafka Producer需要知道如何将Java或Scala对象转换为二进制数据,KafkaSerializationSchema允许用户指定如何进行数据的序列化。ProducerRecord<byte[], byte[]> serialize(T element, Long timestamp)方法会在每条数据流入的时候调用,来生成ProducerRecord写入Kafka。

    用户可以对每条数据如何写入Kafka进行细粒度地控制。通过ProducerRecord可以进行以下操作:

    • 设置写入的Topic名称。

    • 定义消息键(Key)。

    • 指定数据写入的Partition。

    Kafka客户端属性

    bootstrap.servers必填,以逗号分隔的Kafka Broker列表。

    容错语义

    启用Flink的Checkpoint后,Flink Kafka Producer可以保证精确一次的语义。除了启用Flink的Checkpoint外,您还可以通过Semantic参数来指定不同的容错语义,Semantic参数详情如下:

    • Semantic.NONE:Flink不做任何保证,数据可能会丢失或重复。

    • Semantic.AT_LEAST_ONCE(默认设置):保证不会丢失任何数据,但可能会重复。

    • Semantic.EXACTLY_ONCE:使用Kafka事务提供精确一次的语义保证。

      说明

      使用EXACTLY_ONCE语义时,需要注意的事项请参见EXACTLY_ONCE语义注意事项

EXACTLY_ONCE语义注意事项

  • 当使用事务写入Kafka时,请为所有消费Kafka数据的应用配置isolation.level参数。该参数取值如下:

    • read_committed:只读取已提交的数据。

    • read_uncommitted(默认值):可以读取未提交的数据。

  • Semantic.EXACTLY_ONCE模式依赖于在从某个Checkpoint恢复后,且在该Checkpoint开始之前所提交的事务。如果Flink作业崩溃与完成重启之间的时间大于Kafka的事务超时时间,则会有数据丢失,因为Kafka会自动中止超过超时时间的事务。因此,请根据您的预期停机时间适当地配置您的事务超时。

    Kafka Broker默认的transaction.max.timeout.ms设置为15分钟,Producer设置的事务超时不能超过Broker指定的时间。Flink Kafka Producer默认会将Kafka Producer配置中的transaction.timeout.ms属性设置为1小时,因此在使用Semantic.EXACTLY_ONCE模式前,需要增加Broker端的transaction.max.timeout.ms值。

  • Semantic.EXACTLY_ONCE模式为在Flink Kafka Producer实例中使用一个固定大小的Kafka Producer池。每个Checkpoint会使用池中的一个Kafka Producer。如果并发的Checkpoint数量超过Producer池的大小,Flink Kafka Producer会抛出异常并使整个作业失败。请相应地配置Producer池大小和最大并发的Checkpoint数量。

  • Semantic.EXACTLY_ONCE会尽可能清除阻止Consumer从Kafka topic中读取数据的残留事务。但如果Flink作业在第一个 Checkpoint之前就出现故障,则在重启该作业后并不会保留重启前Producer池的信息。因此,在第一个Checkpoint完成之前缩减Flink作业的并行度是不安全的,即使要缩减并行度,也不能小于FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR。

  • 对于在read_committed模式下运行的Kafka Consumer,任何未结束(既未中止也未完成)的事务都将阻塞对该Kafka Topic的所有读取。如果您按照以下步骤进行了操作:

    1. 用户开启事务1并通过该事务写入了一些数据。

    2. 用户开启事务2并通过该事务写入了更多的数据。

    3. 用户提交事务2。

    即使来自事务2的数据已经提交,在事务1提交或中止之前,事务2的数据对消费者是不可见的。因此:

    • 在Flink作业正常工作期间,您可以预期写入Kafka topic的数据会有延迟,约为Checkpoint的平均间隔。

    • 在Flink作业失败的情况下,该作业正在写入的Topic将会阻塞Consumer的读取,直到作业重新启动或事务超时。

常见问题