This topic provides the DDL syntax that is used to create a Message Queue for Apache RocketMQ source table, describes the parameters in the WITH clause, and provides data type mappings, attribute fields, and sample code.
What is Message Queue for Apache RocketMQ?
Message Queue for Apache RocketMQ is a distributed messaging middleware that is developed by Alibaba Cloud based on Apache RocketMQ. Message Queue for Apache RocketMQ features low latency, high concurrency, high availability, and high reliability. Message Queue for Apache RocketMQ provides the asynchronous decoupling and load shifting features for distributed application systems. It also supports features for Internet applications, including massive message accumulation, high throughput, and reliable retry.
Prerequisites
Resources are created in the Message Queue for Apache RocketMQ console. For more information, see Create resources.
Limits
Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Message Queue for Apache RocketMQ connectors.
DDL syntax
create table mq_source(
x varchar,
y varchar,
z varchar
) with (
'connector' = 'mq',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'pullIntervalMs' = '1000',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'startMessageOffset' = '1000',
'consumerGroup' = '<yourConsumerGroup>',
'fieldDelimiter' = '|'
);
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the source table. | Yes | Set the value to mq .
|
topic | The name of the topic. | Yes | N/A. |
endPoint | The endpoint of Message Queue for Apache RocketMQ. | Yes | Message Queue for Apache RocketMQ supports the following types of endpoints:
|
accessId | The AccessKey ID that is used to access the Message Queue for Apache RocketMQ instance. | Yes | N/A. |
accessKey | The AccessKey secret that is used to access the Message Queue for Apache RocketMQ instance. | Yes | N/A. |
consumerGroup | The name of the consumer group. | Yes | N/A. |
pullIntervalMs | The interval at which data is pulled. | Yes | Unit: milliseconds. |
nameServerSubgroup | The name server group. | No |
Note This parameter is supported only in VVR 2.1.1 to VVR 3.0.0.
|
timeZone | The time zone of the instance. | No | Example: Asia/Shanghai. |
startTimeMs | The time to start reading data. | No | N/A. |
startMessageOffset | The start offset to consume messages. | No | This parameter is optional. If you configure this parameter, messages are consumed from the time specified by this parameter. |
tag | The subscription tag. | No | If Message Queue for Apache RocketMQ is used as a data source, only a single tag can be read. |
lineDelimiter | The row delimiter used when a message block is parsed. | No | Default value: \n .
|
fieldDelimiter | The field delimiter. | No | The delimiter varies based on the mode in which the terminal of Message Queue for
Apache RocketMQ works.
|
encoding | The encoding format. | No | Default value: utf-8 .
|
lengthCheck | The rule for checking the number of fields parsed from a row of data. | No | Default value: NONE.
|
columnErrorDebug | Specifies whether debugging is enabled. | No | Default value: FALSE. If you set this parameter to TRUE, a log entry is displayed when a parsing exception occurs. |
instanceID | The ID of the instance. | No | The setting of the instanceID parameter is based on whether the Message Queue for
Apache RocketMQ instance has a separate namespace:
|
Data type mappings
Data type of Message Queue for Apache RocketMQ | Data type of Flink |
---|---|
STRING | VARCHAR |
Attribute fields
Field | Data type | Description |
---|---|---|
topic | VARCHAR METADATA VIRTUAL | The topic of a Message Queue for Apache RocketMQ message. |
queue-id | INT METADATA VIRTUAL | The ID of the queue in which a Message Queue for Apache RocketMQ message is placed. |
queue-offset | BIGINT METADATA VIRTUAL | The consumption offset of a Message Queue for Apache RocketMQ message. |
msg-id | VARCHAR METADATA VIRTUAL | The ID of a Message Queue for Apache RocketMQ message. |
store-timestamp | TIMESTAMP(3) METADATA VIRTUAL | The time at which a Message Queue for Apache RocketMQ message is stored. |
born-timestamp | TIMESTAMP(3) METADATA VIRTUAL | The time at which a Message Queue for Apache RocketMQ message is generated. |
keys | VARCHAR METADATA VIRTUAL | The keys of a Message Queue for Apache RocketMQ message. |
tags | VARCHAR METADATA VIRTUAL | The tags of a Message Queue for Apache RocketMQ message. |
Sample code
- CSV format
The following example shows a Message Queue for Apache RocketMQ message in the CSV format.
1,name,male 2,name,female
Note A Message Queue for Apache RocketMQ message can contain any number of data records. Multiple data records are separated by\n
.You can use the following DDL statement to declare a Message Queue for Apache RocketMQ source table in a Flink job:create table mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) with ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'startMessageOffset' = '1000', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = '|' );
- Binary format
create temporary table source_table ( mess varbinary ) with ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); create temporary table out_table ( commodity varchar ) with ( 'connector' = 'print' ); insert into out_table select cast(mess as varchar) from source_table;