本文为您介绍消息队列Kafka源表的DDL定义、WITH参数、元信息列和示例。
什么是Kafka源表
消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域。
Kafka源表支持连接您自建开源Apache Kafka集群,如何通过公网连接自建Apache Kafka集群详情请参见Flink全托管集群如何访问公网?。
前提条件
连接DataFlow Kafka集群
- DataFlow Kafka集群已创建。如何创建DataFlow Kafka集群,详情请参见创建DataFlow Kafka集群。
- Flink与DataFlow Kafka集群处于同一VPC内,且DataFlow Kafka集群的安全组对Flink放通。详情请参见创建和管理专有网络和安全组概述。
连接消息队列Kafka版集群
- 消息队列Kafka版集群已创建。如何创建新的消息队列Kafka版集群,详情请参见步骤三:创建资源。
- 实时计算Flink与Kafka集群处于同一VPC内,且阿里云消息队列Kafka版已对Flink开放白名单。详情请参见配置白名单。
连接自建Apache Kafka集群
- 自建Apache Kafka集群版本在0.11及以上。
- Flink与自建Apache Kafka集群之间的网络已打通。如何通过公网连接自建集群详情请参见Flink全托管集群如何访问公网?。
网络连接排查
Timed out waiting for a node assignment
错误,一般是Flink和Kafka之间的网络连通问题导致的。Kafka客户端与服务端建立连接的过程如下:- 客户端使用您指定的bootstrap.servers地址连接Kafka服务端,Kafka服务端根据配置向客户端返回集群中各台broker的元信息,包括各台broker的连接地址。
- 客户端使用第一步broker返回的连接地址连接各台broker进行读取或写入。
- 使用Zookeeper命令行工具(zkCli.sh或zookeeper-shell.sh)登录您的Kafka使用的Zookeeper集群。
- 根据您的集群实际情况执行正确的命令,来获取您的Kafka broker元信息。
通常可以使用
get /brokers/ids/0
命令来获取Kafka broker元信息。Kafka broker的连接地址位于endpoints字段中,该地址即为上述Kafka客户端服务端连接过程中服务端向客户端返回的连接地址。如果该地址Flink无法连通,即使bootstrap.servers指定的地址是连通的,也无法读取或写入数据。 - 使用ping或telnet等命令来测试endpoint中显示的地址与Flink的连通性。
如果无法连通该地址,请联系您的Kafka运维修改Kafka配置,为Flink单独配置listeners和advertised.listeners。
如果您想了解更多关于Kafka客户端与服务端的连接细节,详情请参见Kafka客户端与服务端网络连接过程详解 。
使用限制
- 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列Kafka Connector。
- 消息队列Kafka Connector仅支持读取kafka 0.10及以上版本的数据。
- 消息队列Kafka Connector仅支持Kafka 2.4版本的消费者配置项,详情请参见消费者配置项。
- 仅Flink计算引擎vvr-4.0.12-flink-1.13及以上版本支持Kafka作为CTAS的同步数据源。
- 仅JSON format支持类型推导和Schema变更,其它format暂不支持。
- 在Kafka作为CTAS语句的数据源时,仅支持将JSON变更同步到Hudi和Hologres结果表。
- 仅支持Kafka中value部分的类型推导和表结构变更。如果您需要同步Kafka key部分的列,则需要您手动在DDL中进行指定。详情请参见示例三。
DDL定义
CREATE TABLE kafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
`topic` STRING METADATA VIRTUAL,
`partition` BIGINT METADATA VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'my_excellent_topic',
'properties.bootstrap.servers' = 'mykafka:9092',
'properties.group.id' = 'my_excellent_group'
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset'
)
在实际使用中请根据实际情况配置字段名和WITH参数。元信息列
Key | 数据类型 | 说明 |
---|---|---|
topic | STRING NOT NULL METADATA VIRTUAL | Kafka消息所在的Topic名称。 |
partition | INT NOT NULL METADATA VIRTUAL | Kafka消息所在的Partition ID。 |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Kafka消息的消息头(header)。 |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Kafka消息的Leader epoch。 |
offset | BIGINT NOT NULL METADATA VIRTUAL | Kafka消息的偏移量(offset)。 |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Kafka消息的时间戳。 |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Kafka消息的时间戳类型:
|
- 仅VVR 3.0.0及以后版本支持定义元信息列。
- 如果您需要将消息队列Kafka作为结果表,且忽略只读元信息列时,则在源表中定义元信息列时,必须声明这些元信息列为VIRTUAL。
作为CTAS数据源
- 基本规则类型映射基本规则如下表所示。
JSON类型 Flink SQL类型 BOOLEAN BOOLEAN STRING DATE、TIMESTAMP、TIMESTAMP_LTZ、TIME或STRING INT或LONG BIGINT BIGINT DECIMAL或STRING 说明 由于Flink中DECIMAL的类型是有精度限制的。因此,如果整数的实际取值超过了DECIMAL类型最大精度,Flink会自动推导其类型为STRING,避免精度的损失。FLOAT、DOUBLE或BIG DECIMAL DOUBLE ARRAY STRING OBJECT STRING 示例- JSON文本
{ "id": 101, "name": "VVP", "properties": { "owner": "阿里云", "engine": "Flink" } "type": ["大数据"] }
- Flink写入到下游存储的表信息
id name properties type 101 "VVP" { "owner": "阿里云", "engine": "Flink" }
["大数据"]
- JSON文本
- 辅助推导如果您觉得以上基本规则不符合您实际需要,则您可以在源表的DDL中声明特定列的解析类型。通过该方式,Flink会优先使用您声明的列类型去解析目标字段。针对以下示例,Flink会使用DECIMAL的方式去解析price字段,而不是使用默认的基本规则将其转换为DOUBLE类型。
CREATE TABLE evolvingKafkaSource ( price DECIMAL(18, 2) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'topic' = 'evolving_kafka_demo', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
但是,如果您在DDL中指定的类型和实际数据中的类型不一致时,则可以按照以下方式进行处理:- 在声明的类型比实际类型更宽泛时,以声明的类型自动去解析。
例如,声明为DOUBLE,遇到的数据类型为BIGINT,则会以DOUBLE类型去解析。
- 在实际的类型比声明的类型更为宽泛或者两种类型不兼容时,由于当前CTAS不支持类型变更,因此会报错提示您相关信息,您需要重新启动作业并声明准确的类型去解析数据。对于类型的宽泛的程度以及兼容性,可以总结为下图。
上图表示越靠近根节点,其类型越宽泛。如果两个类型在不同的分支上,则表示这两个类型不兼容。
说明- 不支持辅助推导复杂类型,包括ROW、ARRAY、MAP和MULTISET。
- 对于复杂类型,Flink在默认情况下会处理为STRING。
- 在声明的类型比实际类型更宽泛时,以声明的类型自动去解析。
- 在源表DDL中声明'json.infer-schema.flatten-nested-columns.enable'='true',来展开嵌套列中的所有元素至顶层。通过该方式,所有的嵌套列都会被依次展开。为了避免列名冲突,Flink采用索引到该列的路径作为展开后列名字。说明 目前不支持解决列名冲突。如果发生列名冲突,请在源表的DDL中声明json.ignore-parse-errors为true,来忽略存在冲突的数据。示例
- JSON文本
{ "nested": { "inner": { "col": true } } }
- Flink写入到下游存储的表信息
neseted.inner.col true
- JSON文本
- 在DDL中CTAS语法中添加计算列 `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。
详情请参见示例四。
WITH参数
参数 | 说明 | 是否必选 | 数据类型 | 备注 |
---|---|---|---|---|
connector | 源表类型。 | 是 | String | 固定值为kafka 。 |
topic | topic名称。 | 是 | String | 以分号 (;) 分隔多个topic名称,例如topic-1;topic-2 。重要 topic和topic-pattern两个选项只能指定其中一个。 |
topic-pattern | 匹配读取topic名称的正则表达式。所有匹配该正则表达式的topic在作业运行时均会被订阅。 | 否 | String | 重要
|
properties.bootstrap.servers | Kafka Broker地址。 | 是 | String | 格式为host:port,host:port,host:port ,以英文逗号(,)分割。 |
properties.group.id | Kafka消费组ID。 | 是 | String | 无。 |
properties.* | Kafka配置。 | 否 | String | 后缀名必须匹配为Kafka官方文档中定义的配置。Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用自动创建topic。不建议通过以上方式修改'key.deserializer'和'value.deserializer'参数,因为它们会被kafka配置覆盖。 |
format | Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 | 是 | String | 取值如下:
说明
|
value.format | Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 | 是 | String | 取值如下:
说明
|
key.format | 反序列化Kafka消息键(key)时使用的格式。 | 否 | String | 取值如下:
说明
|
key.fields | Kafka消息键(key)解析出来的数据存放的字段。 | 否 | String | 多个字段名以分号(;)分隔。例如field1;field2 。默认不配置该参数,因此key不会被解析,key数据将被丢弃。说明 仅VVR 3.0.0及以后版本支持该参数。 |
key.fields-prefix | 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 | 否 | String | 默认情况下前缀为空。如果定义了前缀,表结构和配置项key.fields都需要使用带前缀的名称。 当构建消息键字段时,前缀会被移除,将使用无前缀的名称。 重要
|
value.fields-include | 在解析消息体时,是否要包含消息键字段。 | 否 | String | 取值如下:
说明 仅VVR 3.0.0及以后版本支持该参数。 |
scan.startup.mode | Kafka读取数据的启动位点。 | 否 | String | 取值如下:
|
scan.startup.specific-offsets | 在specific-offsets启动模式下,指定每个分区的启动偏移量。 | 否 | String | 例如:partition:0,offset:42;partition:1,offset:300 |
scan.startup.timestamp-millis | 在timestamp启动模式下,指定启动位点时间戳。 | 否 | Long | 单位为毫秒。 |
value.fields-prefix | 为所有Kafka消息体(Value)指定自定义前缀,以避免与消息键(Key)或Metadata字段重名。 | 否 | String | 如果定义了前缀,表结构需要使用带前缀的名称。当构建消息键字段时,前缀会被移除,将使用无前缀的名称。 说明 仅Flink计算引擎VVR 4.0.12及之后版本支持该参数。 默认情况下前缀为空。 |
json.infer-schema.flatten-nested-columns.enable | 是否递归式地展开JSON中的嵌套列。 | 否 | Boolean | 参数取值如下:
说明 该参数仅在Kafka作为CTAS数据同步的数据源时生效。 |
json.infer-schema.primitive-as-string | 是否推导所有基本类型为String类型。 | 否 | Boolean | 参数取值如下:
说明 该参数仅在Kafka作为CTAS数据同步的数据源时生效。 |
scan.check.duplicated.group.id | 是否在作业启动时检查group id有无重复。 | 否 | Boolean | 参数取值如下:
|
properties
前缀,并将该Kafka Consumer配置信息追加至WITH参数。例如Kafka集群需要SASL(Simple Authentication and Security Layer)认证。CREATE TABLE kafkaTable (
...
) WITH (
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
消息格式Format
- csv
- json
- avro
- avro-confluent
- debezium-json
- canal-json
- maxwell-json
- raw
每个Format都有其对应的配置项,可以直接在WITH参数中使用。Format的配置项可参见每个Format对应的 Flink社区文档。
启动模式
- 最早位点(earliest-offset),从当前分区的最早位点开始读取。
- 最末尾位点(latest-offset),从当前分区的最末尾位点开始读取。
- 已提交位点(group-offsets),从指定group id的已提交位点开始读取,group id通过properties.group.id指定。
- 指定时间戳(timestamp),从时间戳大于等于指定时间的第一条消息开始读取,时间戳通过scan.startup.timestamp-millis指定。
- 特定位点(specific-offsets),从您指定的分区位点开始消费,位点通过scan.startup.specific-offsets指定。
- 如果您不指定启动位点,则默认会从已提交位点(group-offsets)启动消费。
- scan.startup.mode只针对无状态启动的作业生效,有状态作业启动时会从状态中存储的位点开始消费。
起始位点优先级
- Checkpoint或Savepoint中存储的位点。
- 您在VVP平台上指定的启动时间。
- 您在WITH参数中通过scan.startup.offset指定的启动位点。
- 未指定scan.startup.offset的情况下使用group-offsets。
在以上任何一个步骤中,如果位点过期或Kafka集群发生问题等原因导致位点无效,则会使用properties.auto.offset.reset指定的策略进行位点重置。如果您未设置该配置项,则会产生异常要求您介入。
一种常见情况是使用全新的group id开始消费。首先源表会向Kafka集群查询该group的已提交位点,由于该group id是第一次使用,不会查询到有效位点,所以会通过properties.auto.offset.reset参数配置的策略进行重置。因此在使用全新group id进行消费时,必须配置properties.auto.offset.reset来指定位点重置策略。
源表位点提交
Kafka源表只在checkpoint成功后将当前消费位点提交至Kafka集群。如果您的checkpoint间隔设置较长,您在Kafka集群侧观察到的消费位点会有延迟。在进行checkpoint时,Kafka源表会将当前读取进度存储在状态中,并不依赖于提交到集群上的位点进行故障恢复,提交位点仅仅是为了在Kafka侧能够监控到读取进度,位点提交失败不会对数据正确性产生任何影响。
代码示例
- 示例一:从Kafka中读取数据后插入Kafka。从名称为source的Topic中读取Kafka数据,再写入名称为sink的Topic,数据使用CSV格式。
CREATE TEMPORARY TABLE Kafka_source ( id INT, name STRING, age INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); CREATE TEMPORARY TABLE Kafka_sink ( id INT, name STRING, age INT ) WITH ( 'connector' = 'kafka', 'topic' = 'sink', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); INSERT INTO Kafka_sink SELECT id, name, age FROM Kafka_source;
- 示例二:同步表结构以及数据将Kafka Topic中的消息实时同步到Hologres中。在该情况下,您可以将Kafka消息的offset和partition id作为主键,从而保证在Failover时,Hologres中不会有重复消息。
CREATE TEMPORARY TABLE kafkaTable ( `offset` INT NOT NULL METADATA, `part` BIGINT NOT NULL METADATA FROM 'partition', PRIMARY KEY (`part`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true' -- 可选,将嵌套列全部展开。 ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable;
- 示例三:同步表结构以及Kafka消息的key和value数据。Kafka消息中的key部分已经存储了相关信息,您可以同时同步Kafka中的key和value。
CREATE TEMPORARY TABLE kafkaTable ( `key_id` INT NOT NULL, `val_name` VARCHAR(200) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'val_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`( WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable;
说明 Kafka消息中的key部分不支持表结构变更和类型推导,需要您手动声明。 - 示例四:同步表结构和数据并进行计算。在同步Kafka数据到Hologres时,往往需要一些轻量级的计算。
CREATE TEMPORARY TABLE kafkaTable ( `distinct_id` INT NOT NULL, `properties` STRING, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_' ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable ADD COLUMN `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default'); -- 使用COALESCE处理空值情况。