This topic provides the DDL syntax that is used to create a Message Queue for Apache RocketMQ source table, describes the parameters in the WITH clause, and provides data type mappings, attribute fields, and sample code.

Note A Message Queue for Apache RocketMQ connector can be used to store data of a source table for streaming jobs and batch jobs.

What is Message Queue for Apache RocketMQ?

Message Queue for Apache RocketMQ is a distributed messaging middleware that is developed by Alibaba Cloud based on Apache RocketMQ. Message Queue for Apache RocketMQ features low latency, high concurrency, high availability, and high reliability. Message Queue for Apache RocketMQ provides asynchronous decoupling and load shifting for distributed application systems. It also supports features for Internet applications, including massive message accumulation, high throughput, and reliable retries.

Prerequisites

Resources are created in the Message Queue for Apache RocketMQ console. For more information, see Create resources.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Message Queue for Apache RocketMQ connectors.

DDL syntax

create table mq_source(
  x varchar,
  y varchar,
  z varchar
) with (
  'connector' = 'mq',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'pullIntervalMs' = '1000',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'startMessageOffset' = '1000',
  'consumerGroup' = '<yourConsumerGroup>',
  'fieldDelimiter' = '|'
);
Note Message Queue for Apache RocketMQ is a messaging middleware that stores unstructured data. You do not need to define schemas for Message Queue for Apache RocketMQ tables. The schemas are determined by the service layer. Flink supports messages of Message Queue for Apache RocketMQ only in the CSV and binary formats.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to mq.
topic The name of the topic. Yes N/A.
endPoint The endpoint of Message Queue for Apache RocketMQ. Yes Message Queue for Apache RocketMQ supports the following types of endpoints:
  • For jobs that run on VVR 3.0.1 or later, use Transmission Control Protocol (TCP) endpoints. For more information, see Announcement on the settings of internal TCP endpoints. You can use one of the following methods to obtain the endpoints:
    • Internal endpoints of Message Queue for Apache RocketMQ that resides in the classic network or a virtual private cloud (VPC) of Alibaba Cloud: Log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internal Access in the TCP Endpoint section.
    • Public endpoint of Message Queue for Apache RocketMQ: Log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internet Access in the TCP Endpoint section.
  • For jobs that run on VVR of a version earlier than 3.0.1, use the following endpoints:
    • Internal endpoints of Message Queue for Apache RocketMQ that resides in the classic network or a VPC of Alibaba Cloud:
      • China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), and China (Hong Kong): onsaddr-internal.aliyun.com:8080.
      • Singapore (Singapore): ap-southeastaddr-internal.aliyun.com:8080.
      • UAE (Dubai): ons-me-east-1-internal.aliyuncs.com:8080.
      • India (Mumbai): ons-ap-south-1-internal.aliyuncs.com:8080.
      • Malaysia (Kuala Lumpur): ons-ap-southeast-3-internal.aliyun.com:8080.
    • Public endpoint of Message Queue for Apache RocketMQ: onsaddr-internet.aliyun.com:80.
accessId The AccessKey ID that is used to access the Message Queue for Apache RocketMQ instance. Yes N/A.
accessKey The AccessKey secret that is used to access the Message Queue for Apache RocketMQ instance. Yes N/A.
consumerGroup The name of the consumer group. Yes N/A.
pullIntervalMs The interval at which data is pulled. Yes Unit: milliseconds.
nameServerSubgroup The name server group. No
  • For the internal Message Queue for Apache RocketMQ service that resides in the classic network or a VPC of Alibaba Cloud, you must set this parameter to nsaddr4client-internal.
  • For the public Message Queue for Apache RocketMQ service, you do not need to configure this parameter.
Note Only VVR versions from 2.1.1 to 3.0.0 support this parameter. VVR 3.0.1 and later do not support this parameter.
timeZone The time zone of the instance. No Example: Asia/Shanghai.
startTimeMs The time to start reading data. No N/A.
startMessageOffset The start offset to consume messages. No This parameter is optional. If you configure this parameter, messages are consumed from the time specified by this parameter.
tag The subscription tag. No N/A.
lineDelimiter The row delimiter used when a message block is parsed. No Default value: \n.
fieldDelimiter The field delimiter. No The delimiter varies based on the mode in which the terminal of Message Queue for Apache RocketMQ works.
  • In read-only mode, the delimiter is u&'\0001. This is the default mode. In this mode, the delimiter is not visible.
  • In edit mode, the delimiter is ^A.
encoding The encoding format. No Default value: utf-8.
lengthCheck The rule for checking the number of fields parsed from a row of data. No Default value: NONE.
  • If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.
  • If the number of fields that are parsed from a row of data is less than the specified number of fields, this row of data is skipped.
Other valid values are SKIP, EXCEPTION, and PAD.
  • SKIP: If the number of fields that are parsed from a row of data is different from the specified number of fields, this row of data is skipped.
  • EXCEPTION: If the number of fields that are parsed from a row of data is different from the specified number of fields, an exception is reported.
  • PAD: Data is padded from left to right based on the order of specified fields.
    • If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.
    • If the number of fields that are parsed from a row of data is less than the specified number of fields, the values of the missing fields are padded with null.
columnErrorDebug Specifies whether debugging is enabled. No Default value: FALSE. If you set this parameter to TRUE, a log entry is displayed when a parsing exception occurs.
instanceID The ID of the instance. No The setting of the instanceID parameter is based on whether the Message Queue for Apache RocketMQ instance has a separate namespace:
  • If the instance has a separate namespace, you must specify the instanceID parameter.
  • If the instance does not have a separate namespace, you do not need to specify the instanceID parameter.

Data type mapping

Data type of Message Queue for Apache RocketMQ Data type of Flink
STRING VARCHAR

Attribute fields

Field Data type Description
topic VARCHAR METADATA VIRTUAL The topic of a Message Queue for Apache RocketMQ message.
queue-id INT METADATA VIRTUAL The ID of the queue in which a Message Queue for Apache RocketMQ message is placed.
queue-offset BIGINT METADATA VIRTUAL The consumption offset of a Message Queue for Apache RocketMQ message.
msg-id VARCHAR METADATA VIRTUAL The ID of a Message Queue for Apache RocketMQ message.
store-timestamp TIMESTAMP(3) METADATA VIRTUAL The time at which a Message Queue for Apache RocketMQ message is stored.
born-timestamp TIMESTAMP(3) METADATA VIRTUAL The time at which a Message Queue for Apache RocketMQ message is generated.
keys VARCHAR METADATA VIRTUAL The keys of a Message Queue for Apache RocketMQ message.
tags VARCHAR METADATA VIRTUAL The tags of a Message Queue for Apache RocketMQ message.
Note You can obtain the preceding attribute fields of Message Queue for Apache RocketMQ only if you use VVR 3.0.1 or later.

Sample code

  • CSV format
    The following example shows a Message Queue for Apache RocketMQ message in the CSV format.
    1,name,male 
    2,name,female
    Note A Message Queue for Apache RocketMQ message can contain any number of data records. Multiple data records are separated by \n.
    You can use the following DDL statement to declare a Message Queue for Apache RocketMQ source table in a Flink job:
    create table mq_source(
       id varchar,
       name varchar,
       gender varchar,
       topic varchar metadata virtual
    ) with (
       'connector' = 'mq',
       'topic' = 'mq-test',
       'endpoint' = '<yourEndpoint>',
       'pullIntervalMs' = '1000',
       'accessId' = '<yourAccessId>',
       'accessKey' = '<yourAccessSecret>',
       'startMessageOffset' = '1000',
       'consumerGroup' = 'mq-group',
       'fieldDelimiter' = '|'
    );
  • Binary format
    create temporary table source_table (
      mess varbinary
    ) with (
      'connector' = 'mq',
      'endpoint' = '<yourEndpoint>',
      'pullIntervalMs' = '500',
      'accessId' = '<yourAccessId>',
      'accessKey' = '<yourAccessSecret>',
      'topic' = 'mq-test',
      'consumerGroup' = 'mq-group'
    );
    
    create temporary table out_table (
      commodity varchar
    ) with (
      'connector' = 'print'
    );
    
    insert into out_table
    select 
      cast(mess as varchar)
    from source_table;