This topic describes how to create a Message Queue for Apache RocketMQ source table in Realtime Compute for Apache Flink. This topic also describes the comma-separated values (CSV) file format, parameters in the WITH clause, and data type mappings used when you create a Message Queue for Apache RocketMQ source table.
Introduction to Message Queue for Apache RocketMQ
Message Queue for Apache RocketMQ is a professional messaging middleware that is developed by Alibaba Cloud. It is a core service of the enterprise-level Internet architecture. You can specify Message Queue for Apache RocketMQ tables as source tables for Realtime Compute for Apache Flink to process streaming data.
Example
create table mq_stream(
x varchar,
y varchar,
z varchar
) with (
type='mq',
topic='<yourTopicName>',
endpoint='<yourEndpoint>',
pullIntervalMs='1000',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
startMessageOffset='1000',
consumerGroup='<yourConsumerGroup>',
fieldDelimiter='|'
);
CSV format
1,name,male
2,name,female
\n
.
create table mq_stream(
id varchar,
name varchar,
gender varchar
) with (
type='mq',
topic='<yourTopicName>',
endpoint='<ourEndpoint>',
pullIntervalMs='1000',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
startMessageOffset='1000',
consumerGroup='<yourConsumerGroup>',
fieldDelimiter='|'
);
Binary format
create table source_table (
mess varbinary
) with (
type = 'mq',
endpoint = '<yourEndpoint>',
pullIntervalMs='500',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
topic = '<yourTopicName>',
consumerGroup='<yourConsumerGroup>'
);
create table out_table (
commodity varchar
) with (
type='print'
);
INSERT INTO out_table
SELECT
cast(mess as varchar)
FROM source_table;
- The
cast (mess as varbinary)
method is supported only in Realtime Compute for Apache Flink that uses Blink 2.0 or later. If the Blink version is earlier than 2.0, upgrade the Blink version first. - Data of the VARBINARY type can be passed in only once.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
type | The type of the source table. | Yes | Set the value to mq. |
topic | The name of a topic. | Yes | N/A. |
endPoint | The endpoint of Message Queue for Apache RocketMQ. | Yes | Two types of Message Queue for Apache RocketMQ services are provided: internal Message
Queue for Apache RocketMQ and public Message Queue for Apache RocketMQ. Select an
endpoint based on the type of Message Queue for Apache RocketMQ that you purchase.
Note
|
accessId | AccessKey ID | Yes | N/A. |
accessKey | AccessKey Secret | Yes | N/A. |
consumerGroup | The name of a consumer group. | Yes | N/A. |
pullIntervalMs | The interval at which messages are pulled. | No | Unit: milliseconds. |
startTime | The time when Realtime Compute for Apache Flink starts to consume messages. | No | N/A. |
startMessageOffset | The start offset to consume messages. | No | This parameter is optional. If you configure this parameter, messages are consumed from the point of time that is specified by this parameter. |
tag | The subscription tag. | No | N/A. |
lineDelimiter | A row delimiter that is used to parse message blocks. | No | Default value: \n .
|
fieldDelimiter | A field delimiter. | No | Default value: \u0001 . \u0001 is the field delimiter in read-only mode, and ^A is the field delimiter in edit mode. \u0001 is invisible in read-only mode.
|
encoding | The encoding format. | No | Default value: utf-8 .
|
lengthCheck | The policy that is used to check the number of fields parsed from a row of data. | No | Default value: NONE.
|
columnErrorDebug | Specifies whether to enable debugging. | No | Default value: FALSE. If you configure this parameter to TRUE, a log entry is displayed when a parsing exception occurs. |
instanceID | The ID of a Message Queue for Apache RocketMQ instance. | No | If the Message Queue for Apache RocketMQ instance does not have separate namespaces, you do not need to configure this parameter. If the Message Queue for Apache RocketMQ instance has separate namespaces, you must configure this parameter. |
Data type mapping
Data type of Message Queue for Apache RocketMQ | Data type of Realtime Compute for Apache Flink |
---|---|
STRING | VARCHAR |