本文为您介绍消息队列Kafka结果表的DDL定义、WITH参数和示例。

什么是消息队列Kafka

消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域。

Kafka结果表支持连接您自建开源Apache Kafka集群,如何通过公网连接自建集群详情请参见Flink全托管集群如何访问公网?

前提条件

连接DataFlow Kafka集群

连接消息队列Kafka版集群

  • 消息队列Kafka版集群已创建,详情请参见VPC接入
  • 实时计算Flink与Kafka集群处于同一VPC内,且阿里云消息队列Kafka版已对Flink开放白名单。可参考配置白名单来配置消息队列Kafka版的白名单。
说明 由于云存储版消息队列Kafka版不支持幂等和事务写入,您将无法使用Kafka结果表提供的精确一次语义(exactly-once semantic)功能。消息队列Kafka版的存储引擎对比与功能限制详情请参见存储引擎对比

连接自建Apache Kafka集群

网络连接排查

如果您的Flink作业在启动时出现Timed out waiting for a node assignment错误,一般是Flink和Kafka之间的网络连通问题导致的。Kafka客户端与服务端建立连接的过程如下:
  1. 客户端使用您指定的bootstrap.servers地址连接Kafka服务端,Kafka服务端根据配置向客户端返回集群中各台broker的元信息,包括各台broker的连接地址。
  2. 客户端使用第一步broker返回的连接地址连接各台broker进行读取或写入。
如果Kafka服务端没有正确配置,客户端在第一步收到的连接地址有误,即使bootstrap.servers配置的地址可以连接上,也无法正常读取/写入数据。该问题经常在Flink与Kafka之间存在代理、端口转发、专线等网络转发机制时发生。您可以按照以下步骤检查Kafka集群是否配置正确:
  1. 使用Zookeeper命令行工具(zkCli.sh或zookeeper-shell.sh)登录您的Kafka使用的Zookeeper集群。
  2. 根据您的集群实际情况执行正确的命令,来获取您的Kafka broker元信息。

    通常可以使用get /brokers/ids/0命令来获取Kafka broker元信息。Kafka broker的连接地址位于endpoints字段中,该地址即为上述Kafka客户端服务端连接过程中服务端向客户端返回的连接地址。如果该地址Flink无法连通,即使bootstrap.servers指定的地址是连通的,也无法读取或写入数据。

  3. 使用ping或telnet等命令来测试endpoint中显示的地址与Flink的连通性。

    如果无法连通该地址,请联系您的Kafka运维修改Kafka配置,为Flink单独配置listeners和advertised.listeners。

如果您想了解更多关于Kafka客户端与服务端的连接细节,详情请参见Kafka客户端与服务端网络连接过程详解

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列Kafka Connector。
  • 消息队列Kafka Connector仅支持将结果数据写入到kafka 0.10及以上版本。
  • 消息队列Kafka Connector仅支持Kafka 2.4的生产者配置项,详情请参见生产者配置项

DDL定义

create table kafka_sink(  
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)        
) with (
  'connector' = 'kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'format' = 'csv'
);

WITH参数

参数说明是否必选数据类型备注
connector结果表类型。STRING固定值为kafka
topic结果表对应的Topic。STRING
properties.bootstrap.serversKafka Broker地址。STRING格式为host:port,host:port,host:port,以英文逗号(,)分割。
formatFlink Kafka Connector在序列化来自Kafka的消息时使用的格式。STRING格式取值如下:
  • csv
  • json
  • avro
sink.partitioner从Flink分区到Kafka分区的映射模式。STRING映射模式取值如下:
  • default(默认值):使用Kafka Producer默认的分区模式。
  • fixed:每个Flink分区对应至多一个Kafka分区。
  • round-robin:Flink分区中的数据将被轮流分配至Kafka的各个分区。
  • 自定义分区映射模式:如果fixed和round-robin不满足您的需求,您可以创建一个FlinkKafkaPartitioner的子类来自定义分区映射模式。参见下方“自定义分区器”一节。
Kafka Producer配置参数详情请参见Kafka官网生产者配置项列表。如果您还需要直接配置Connector使用的Kafka Producer,可以在Kafka Producer配置参数前添加properties前缀,并将该Kafka Producer配置信息追加至WITH参数。例如Kafka集群需要SASL(Simple Authentication and Security Layer)认证,代码示例如下。
CREATE TABLE kafkaTable (
    ...
) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
如果内置的Kafka Producer分区模式无法满足您的需求,您可以实现自定义分区模式将数据写入对应的分区。自定义分区器需要继承FlinkKafkaPartitioner ,开发完成后编译JAR包,使用资源上传功能上传至VVP平台,步骤如下。
  1. 登录实时计算控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的JAR包。
  5. 在目标作业开发页面附加依赖文件项,选择目标的JAR包。

上传并引用完成后,请在WITH参数中设置sink.partitioner参数,值为分区器完整的类路径,例如org.mycompany.MyPartitioner

从Kafka中读取数据后插入Kafka示例

从名称为source的Topic中读取Kafka数据,再写入名称为sink的Topic,数据使用CSV格式。
CREATE TEMPORARY TABLE kafka_source (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'kafka',
    'topic' = '<yourTopicName>',
    'properties.bootstrap.servers' = '<yourKafkaBrokers>',
    'properties.group.id' = '<yourPropertiesGroupid>',
    'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'kafka',
    'topic' = '<yourTopicName>',
    'properties.bootstrap.servers' = '<yourKafkaBrokers>',
    'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

常见问题

Flink和Kafka网络连通,但Flink无法消费或者写入数据?