本文为您介绍消息队列RocketMQ版结果表DDL定义、WITH参数和示例代码等。
说明 RocketMQ Connector可以作为Stream作业和Batch作业的结果表使用。
什么是消息队列RocketMQ版
消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。Flink支持将消息队列MQ作为流式数据的输出。
前提条件
已创建了RocketMQ资源,详情请参见创建资源。
使用限制
仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列RocketMQ Connector。
DDL定义
create table mq_sink(
x varchar,
y varchar,
z varchar
) with (
'connector'='mq',
'topic'='<yourTopicName>',
'endpoint'='<yourEndpoint>',
'accessId'='<yourAccessId>',
'accessKey'='<yourAccessSecret>'
);
说明 MQ是非结构化存储格式的消息中间件,对于数据的Schema不提供强制定义,完全由业务层指定。Flink仅支持CSV和二进制格式的MQ消息。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 结果表类型。 | 是 | 固定值为mq 。
|
topic | topic名称。 | 是 | 无。 |
endpoint | 地址。 | 是 | 阿里云消息队列RocketMQ版接入地址支持以下两种类型:
|
accessId | AccessKey ID。 | 是 | 无。 |
accessKey | AccessKey Secret。 | 是 | 无。 |
producerGroup | 写入的群组。 | 是 | 无。 |
tag | 写入的标签。 | 否 | 支持设置多个tag,以逗号(,)进行分隔。 |
nameServerSubgroup | NameServer组。 | 否 |
说明 仅VVR 2.1.1 ~ VVR 3.0.0版本支持该参数,VVR 3.0.1及以后版本不支持该参数。
|
encoding | 编码类型。 | 否 | 默认值为utf-8 。
|
retryTimes | 写入的重试次数。 | 否 | 默认值为10。 |
sleepTimeMs | 重试间隔时间。 | 否 | 默认值为1000(毫秒)。 |
instanceID | MQ实例ID。 | 否 |
|
属性字段
字段名 | 字段类型 | 说明 |
---|---|---|
keys | VARCHAR METADATA | RocketMQ消息Keys。 |
tags | VARCHAR METADATA | RocketMQ消息Tags。 |
说明 仅实时计算引擎VVR 4.0.0及以上版本支持以上RocketMQ属性字段。
代码示例
- 创建RocketMQ结果表:
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='<yourAccessId>', 'accessKey'='<yourAccessSecret>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>', 'tag'='<yourTagName>', 'encoding'='utf-8', 'retryTimes'='5', 'sleepTimeMs'='500' );
说明 如果您的MQ消息为二进制格式,则DDL中只能定义1个字段,且字段类型必须为VARBINARY。 - 创建将keys和tags字段指定为RocketMQ消息的key和tag的结果表:
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='<yourAccessId>', 'accessKey'='<yourAccessSecret>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>', 'encoding'='utf-8', 'retryTimes'='5', 'sleepTimeMs'='500' );