本文档为您介绍如何创建实时计算Flink版消息队列Kafka结果表,以及Kafka版本对应关系。
注意
- 本文仅适用于实时计算2.0及以上版本。
- 本文仅适用于独享模式。
- Kafka结果表支持写入自建Kafka集群,但需注意版本对应关系,以及自建集群和实时计算集群的网络环境配置。
什么是Kafka结果表
消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域。实时计算采用Kafka作为流式数据的数据源表或结果表。
DDL定义
消息队列Kafka结果表需要定义的DDL如下。
create table sink_kafka (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'kafka010',
topic = '<yourTopicName>',
bootstrap.servers = '<yourServerAddress>'
);
说明
- 创建Kafka结果表时,必须明文指定
PRIMARY KEY (messageKey)
。 - 仅Blink 2.2.6及以上版本,支持阿里云Kafka或自建Kafka的TPS和RPS等指标信息的显示。
WITH参数
- 通用配置
参数 说明 是否必选 备注 type Kafka对应版本 是 必须是Kafka08、Kafka09、Kafka010、Kafka011中的一种,版本对应关系请参见Kafka版本对应关系。 topic Topic名称 是 无 - 必选配置
- Kafka08必选配置
参数 说明 zookeeper.connect zk连接ID - Kafka09/Kafka010/Kafka011必选配置
参数 说明 bootstrap.servers Kafka集群地址
- Kafka08必选配置
- 可选配置参数
consumer.id
socket.timeout.ms
fetch.message.max.bytes
num.consumer.fetchers
auto.commit.enable
auto.commit.interval.ms
queued.max.message.chunks
rebalance.max.retries
fetch.min.bytes
fetch.wait.max.ms
rebalance.backoff.ms
refresh.leader.backoff.ms
auto.offset.reset
consumer.timeout.ms
exclude.internal.topics
partition.assignment.strategy
client.id
zookeeper.session.timeout.ms
zookeeper.connection.timeout.ms
zookeeper.sync.time.ms
offsets.storage
offsets.channel.backoff.ms
offsets.channel.socket.timeout.ms
offsets.commit.max.retries
dual.commit.enabled
partition.assignment.strategy
socket.receive.buffer.bytes
fetch.min.bytes
Kafka版本对应关系
type | Kafka版本 |
---|---|
Kafka08 | 0.8.22 |
Kafka09 | 0.9.0.1 |
Kafka010 | 0.10.2.1 |
Kafka011 | 0.11.0.2及以上 |
示例
create table datahub_input (
id VARCHAR,
nm VARCHAR
) with (
type = 'datahub'
);
create table sink_kafka (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'kafka010',
topic = '<yourTopicName>',
bootstrap.servers = '<yourServerAddress>'
);
INSERT INTO
sink_kafka
SELECT
cast(id as VARBINARY) as messageKey,
cast(nm as VARBINARY) as `message`
FROM
datahub_input;