本文为您介绍如何创建实时计算Flink版消息队列MQ结果表,以及创建过程涉及到的WITH参数。
重要
- 本文仅适用于Blink 1.4.5及以上版本。
- 如果您需要使用带独立命名空间的MQ,请使用Blink 3.x作业版本。
什么是消息列队MQ
消息队列MQ(Message Queue)是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品。消息列队基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)和监控报警等一套完整的消息云服务。
CSV格式
实时计算Flink版可以将消息队列作为流式数据进行输出,CSV格式输出示例如下。
CREATE TABLE stream_test_hotline_agent (
id INTEGER,
len BIGINT,
content VARCHAR
) WITH (
type='mq',
endpoint='<yourEndpoint>',
accessID='<yourAccessId>',
accessKey='<yourAccessSecret>',
topic='<yourTopicName>',
producerGroup='<yourGroupName>',
tag='<yourTagName>',
encoding='utf-8',
fieldDelimiter=',',
retryTimes='5',
sleepTimeMs='500'
);
二进制格式
实时计算Flink版可以将消息队列作为流式数据进行输出,二进制格式输出示例如下。
CREATE TABLE source_table (
commodity VARCHAR
)WITH(
type='random'
);
CREATE TABLE result_table (
mess VARBINARY
) WITH (
type = 'mq',
endpoint='<yourEndpoint>',
accessID='<yourAccessId>',
accessKey='<yourAccessSecret>',
topic='<yourTopicName>',
producerGroup='<yourGroupName>'
);
INSERT INTO result_table
SELECT
CAST(SUBSTRING(commodity,0,5) AS VARBINARY) AS mess
FROM source_table;
说明
cast(varchar as varbinary)
需要在Blink 2.0及以上版本使用。如果版本低于2.0,请先完成版本升级,详情请参见管理独享集群Blink版本。
WITH参数
参数 | 说明 | 备注 |
---|---|---|
type | 结果表类型 | 固定值为mq。 |
topic | Message Queue队列名称 | 无 |
endpoint | 地址 | 阿里云消息队列提供内网服务MQ(非公网region)和公网服务MQ(公网region)两种类型,请务必根据您购买的MQ的类型选择对应正确的接入地址:
重要
|
accessID | AccessKey ID | 无 |
accessKey | AccessKey Secret | 无 |
producerGroup | 写入的群组 | 无 |
tag | 写入的标签 | 可选,默认值为空。 |
fieldDelimiter | 字段分割符 | 可选,默认值为\u0001 。分隔符的使用情况如下所示:
|
encoding | 编码类型 | 可选,默认值为utf-8 。
|
retryTimes | 写入的重试次数 | 可选,默认值为10。 |
sleepTimeMs | 重试间隔时间 | 可选,默认值为1000(毫秒)。 |
instanceID | MQ实例ID |
|