本文为您介绍消息队列Kafka结果表的DDL定义、WITH参数和示例。
什么是消息队列Kafka
消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域。
Kafka结果表支持连接您自建开源Apache Kafka集群,如何通过公网连接自建集群详情请参见Flink全托管集群如何访问公网? 。
前提条件
连接DataFlow Kafka集群
- DataFlow Kafka集群已创建。可参考创建DataFlow Kafka集群创建新的DataFlow Kafka集群。
- Flink与DataFlow Kafka集群处于同一VPC内,且DataFlow Kafka集群的安全组对Flink放通。详情请参见创建和管理专有网络和安全组概述。
连接消息队列Kafka版集群
- 消息队列Kafka版集群已创建,详情请参见VPC接入。
- 实时计算Flink与Kafka集群处于同一VPC内,且阿里云消息队列Kafka版已对Flink开放白名单。可参考配置白名单来配置消息队列Kafka版的白名单。
说明 由于云存储版消息队列Kafka版不支持幂等和事务写入,您将无法使用Kafka结果表提供的精确一次语义(exactly-once semantic)功能。消息队列Kafka版的存储引擎对比与功能限制详情请参见存储引擎对比。
连接自建Apache Kafka集群
- 自建Apache Kafka集群版本在0.11及以上。
- Flink与自建Apache Kafka集群之间的网络已打通。如何通过公网连接自建集群详情请参见Flink全托管集群如何访问公网?。
网络连接排查
如果您的Flink作业在启动时出现
Timed out waiting for a node assignment
错误,一般是Flink和Kafka之间的网络连通问题导致的。Kafka客户端与服务端建立连接的过程如下:- 客户端使用您指定的bootstrap.servers地址连接Kafka服务端,Kafka服务端根据配置向客户端返回集群中各台broker的元信息,包括各台broker的连接地址。
- 客户端使用第一步broker返回的连接地址连接各台broker进行读取或写入。
如果Kafka服务端没有正确配置,客户端在第一步收到的连接地址有误,即使bootstrap.servers配置的地址可以连接上,也无法正常读取/写入数据。该问题经常在Flink与Kafka之间存在代理、端口转发、专线等网络转发机制时发生。您可以按照以下步骤检查Kafka集群是否配置正确:
- 使用Zookeeper命令行工具(zkCli.sh或zookeeper-shell.sh)登录您的Kafka使用的Zookeeper集群。
- 根据您的集群实际情况执行正确的命令,来获取您的Kafka broker元信息。
通常可以使用
get /brokers/ids/0
命令来获取Kafka broker元信息。Kafka broker的连接地址位于endpoints字段中,该地址即为上述Kafka客户端服务端连接过程中服务端向客户端返回的连接地址。如果该地址Flink无法连通,即使bootstrap.servers指定的地址是连通的,也无法读取或写入数据。 - 使用ping或telnet等命令来测试endpoint中显示的地址与Flink的连通性。
如果无法连通该地址,请联系您的Kafka运维修改Kafka配置,为Flink单独配置listeners和advertised.listeners。
如果您想了解更多关于Kafka客户端与服务端的连接细节,详情请参见Kafka客户端与服务端网络连接过程详解 。
使用限制
- 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列Kafka Connector。
- 消息队列Kafka Connector仅支持将结果数据写入到kafka 0.10及以上版本。
- 消息队列Kafka Connector仅支持Kafka 2.4的生产者配置项,详情请参见生产者配置项。
DDL定义
create table kafka_sink(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) with (
'connector' = 'kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'format' = 'csv'
);
WITH参数
参数 | 说明 | 是否必选 | 数据类型 | 备注 |
---|---|---|---|---|
connector | 结果表类型。 | 是 | STRING | 固定值为kafka 。 |
topic | 结果表对应的Topic。 | 是 | STRING | 无 |
properties.bootstrap.servers | Kafka Broker地址。 | 是 | STRING | 格式为host:port,host:port,host:port ,以英文逗号(,)分割。 |
format | Flink Kafka Connector在序列化来自Kafka的消息时使用的格式。 | 是 | STRING | 格式取值如下:
|
sink.partitioner | 从Flink分区到Kafka分区的映射模式。 | 否 | STRING | 映射模式取值如下:
|
Kafka Producer配置参数详情请参见Kafka官网生产者配置项列表。如果您还需要直接配置Connector使用的Kafka Producer,可以在Kafka Producer配置参数前添加
properties
前缀,并将该Kafka Producer配置信息追加至WITH参数。例如Kafka集群需要SASL(Simple Authentication and Security Layer)认证,代码示例如下。CREATE TABLE kafkaTable (
...
) WITH (
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
如果内置的Kafka Producer分区模式无法满足您的需求,您可以实现自定义分区模式将数据写入对应的分区。自定义分区器需要继承FlinkKafkaPartitioner ,开发完成后编译JAR包,使用资源上传功能上传至VVP平台,步骤如下。
- 登录实时计算控制台
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击资源上传。
- 单击上传资源,选择您要上传的JAR包。
- 在目标作业开发页面附加依赖文件项,选择目标的JAR包。
上传并引用完成后,请在WITH参数中设置sink.partitioner参数,值为分区器完整的类路径,例如org.mycompany.MyPartitioner。
从Kafka中读取数据后插入Kafka示例
从名称为source的Topic中读取Kafka数据,再写入名称为sink的Topic,数据使用CSV格式。
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourPropertiesGroupid>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;