本文为您介绍实时计算如何创建消息队列MQ源表以及创建过程涉及到的CSV类格式、WITH参数和类型映射。
说明 如果您需要使用带独立命名空间的MQ,请使用Blink 3.x作业版本。
什么是消息队列MQ
消息队列MQ是阿里云专业消息中间件,是企业级互联网架构的核心产品。实时计算可以将消息队列作为流式数据输入。
示例
create table mq_stream(
x varchar,
y varchar,
z varchar
) with (
type='mq',
topic='<yourTopicName>',
endpoint='<yourEndpoint>',
pullIntervalMs='1000',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
startMessageOffset='1000',
consumerGroup='<yourConsumerGroup>',
fieldDelimiter='|'
);
说明 MQ实际上是一个非结构化存储格式,对于数据的Schema不提供强制定义,完全由业务层指定。目前实时计算支持类CSV格式文本和二进制格式。
CSV格式
假设您的1条CSV格式消息记录如下。
1,name,male
2,name,female
说明 1条MQ消息可以包括0条到多条数据记录,记录之间使用
\n
分隔。
在实时计算作业中,声明MQ数据源表的DDL如下。
create table mq_stream(
id varchar,
name varchar,
gender varchar
) with (
type='mq',
topic='<yourTopicName>',
endpoint='<ourEndpoint>',
pullIntervalMs='1000',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
startMessageOffset='1000',
consumerGroup='<yourConsumerGroup>',
fieldDelimiter='|'
);
二进制格式
二进制格式测试代码如下。
create table source_table (
mess varbinary
) with (
type = 'mq',
endpoint = '<yourEndpoint>',
pullIntervalMs='500',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
topic = '<yourTopicName>',
consumerGroup='<yourConsumerGroup>'
);
create table out_table (
commodity varchar
) with (
type='print'
);
INSERT INTO out_table
SELECT
cast(mess as varchar)
FROM source_table;
说明
cast(mess as varchar)
需在实时计算2.0及以上版本使用,如果版本低于2.0,请先升级。- VARBINARY只能入参一次。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
type | 源表类型 | 是 | 固定值为mq。 |
topic | topic名称 | 是 | 无。 |
endPoint | endPoint地址 | 是 | 阿里云消息队列提供内网服务MQ(非公网region)和公网服务MQ(公网region)两种类型,请务必根据您购买的MQ的类型选择对应正确的接入地址(endPoint):
说明
|
accessId | AccessKey ID | 是 | 无 |
accessKey | AccessKey Secret | 是 | 无 |
consumerGroup | 订阅消费group名称 | 是 | 无 |
pullIntervalMs | 拉取时间间隔 | 否 | 单位为毫秒。 |
startTime | 消息消费启动的时间点 | 否 | 无 |
startMessageOffset | 消息开始的偏移量 | 否 | 如果填写,将优先以startMessageoffset的位点开始加载。 |
tag | 订阅的标签 | 否 | 无 |
lineDelimiter | 解析block时行分隔符 | 否 | 默认值为 \n 。
|
fieldDelimiter | 字段分隔符 | 否 | 默认值为\u0001 。表示在只读模式下以\u0001 (\u0001 在只读模式不可见)作为分隔符,在编辑模式下以^A 作为分隔符。
|
encoding | 编码格式 | 否 | 默认值为utf-8 。
|
lengthCheck | 单行字段条数检查策略 | 否 | 默认值为NONE,表示:
|
columnErrorDebug | 是否打开调试开关 | 否 | 默认值为FALSE。如果设置为TRUE,则打印解析异常的Log。 |
instanceID | MQ实例ID | 否 | 如果MQ实例无独立命名空间,则不可以使用instanceID参数。如果MQ实例有独立命名空间,则instanceID参数必选。 |
类型映射
MQ字段类型 | 实时计算字段类型 |
---|---|
STRING | VARCHAR |