This topic describes how to create a Message Queue for Apache RocketMQ source table in Realtime Compute for Apache Flink. This topic also describes the comma-separated values (CSV) file format, parameters in the WITH clause, and data type mappings used when you create a Message Queue for Apache RocketMQ source table.

Note If you need to use Message Queue for Apache RocketMQ that has separate namespaces, use Blink 3.X.

Introduction to Message Queue for Apache RocketMQ

Message Queue for Apache RocketMQ is a professional messaging middleware that is developed by Alibaba Cloud. It is a core service of the enterprise-level Internet architecture. You can specify Message Queue for Apache RocketMQ tables as source tables for Realtime Compute for Apache Flink to process streaming data.

Example

create table mq_stream(
  x varchar,
  y varchar,
  z varchar
) with (
  type='mq',
  topic='<yourTopicName>',
  endpoint='<yourEndpoint>',
  pullIntervalMs='1000',
  accessId='<yourAccessId>',
  accessKey='<yourAccessSecret>',
  startMessageOffset='1000',
  consumerGroup='<yourConsumerGroup>',
  fieldDelimiter='|'
);
Note Message Queue for Apache RocketMQ stores unstructured data. You do not need to define schemas for Message Queue for Apache RocketMQ source tables. Instead, schemas are specified at the business layer. Realtime Compute for Apache Flink supports messages in the CSV and binary formats.

CSV format

The following example shows a Message Queue for Apache RocketMQ message in the CSV format.
1,name,male 
2,name,female
Note The number of data records that can be contained in a Message Queue for Apache RocketMQ message is not limited. Multiple data records are separated by \n.
To declare a Message Queue for Apache RocketMQ source table in a Realtime Compute for Apache Flink job, you can use the following DDL statement:
create table mq_stream(
  id varchar,
  name varchar,
  gender varchar
) with (
  type='mq',
  topic='<yourTopicName>',
  endpoint='<ourEndpoint>',
  pullIntervalMs='1000',
  accessId='<yourAccessId>',
  accessKey='<yourAccessSecret>',
  startMessageOffset='1000',
  consumerGroup='<yourConsumerGroup>',
  fieldDelimiter='|'
);

Binary format

For messages in the binary format, you can use the following sample code to create a Message Queue for Apache RocketMQ source table:
create table source_table (
  mess varbinary
) with (
  type = 'mq',
  endpoint = '<yourEndpoint>',
  pullIntervalMs='500',
  accessId='<yourAccessId>',
  accessKey='<yourAccessSecret>',
  topic = '<yourTopicName>',
  consumerGroup='<yourConsumerGroup>'
);

create table out_table (
  commodity varchar
) with (
  type='print'
);

INSERT INTO out_table
SELECT 
  cast(mess as varchar)
FROM source_table;
Note
  • The cast (mess as varbinary) method is supported only in Realtime Compute for Apache Flink that uses Blink 2.0 or later. If the Blink version is earlier than 2.0, upgrade the Blink version first.
  • Data of the VARBINARY type can be passed in only once.

Parameters in the WITH clause

Parameter Description Required Remarks
type The type of the source table. Yes Set the value to mq.
topic The name of a topic. Yes N/A.
endPoint The endpoint of Message Queue for Apache RocketMQ. Yes Two types of Message Queue for Apache RocketMQ services are provided: internal Message Queue for Apache RocketMQ and public Message Queue for Apache RocketMQ. Select an endpoint based on the type of Message Queue for Apache RocketMQ that you purchase.
  • For jobs that run on Blink 3.7.10 or later, use Transmission Control Protocol (TCP) endpoints. For more information, see Announcement on the settings of private TCP endpoints. You can use one of the following methods to obtain the endpoints:
    • Internal endpoints of Message Queue for Apache RocketMQ that resides in the Alibaba Cloud classic network or a virtual private cloud (VPC): Log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the instance whose endpoint you want to obtain, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. In the TCP Endpoint section, you can view the endpoint that corresponds to Internal Access.
    • Public endpoint of Message Queue for Apache RocketMQ: Log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the instance whose endpoint you want to obtain, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. In the TCP Endpoint section, you can view the endpoint that corresponds to Internet Access.
  • For jobs that run on Blink of a version earlier than 3.7.10, use the following endpoints:
    • Internal endpoints of Message Queue for Apache RocketMQ that resides in the Alibaba Cloud classic network or a VPC:
      • China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), and China (Hong Kong): onsaddr-internal.aliyun.com:8080.
      • Singapore (Singapore): ap-southeastaddr-internal.aliyun.com:8080.
      • UAE (Dubai): ons-me-east-1-internal.aliyuncs.com:8080.
      • India (Mumbai): ons-ap-south-1-internal.aliyuncs.com:8080.
      • Malaysia (Kuala Lumpur): ons-ap-southeast-3-internal.aliyun.com:8080.
    • Public endpoint of Message Queue for Apache RocketMQ: onsaddr-internet.aliyun.com:80.
Note
  • If you use a Message Queue for Apache RocketMQ connector in Blink of a version earlier than 3.7.10, you must update the Blink version of your Realtime Compute for Apache Flink job to Blink 3.7.10 or later and change the value of the endpoint parameter to the new endpoint of Message Queue for Apache RocketMQ. This reduces the risks of instability or unavailability that are caused by the old endpoint of Message Queue for Apache RocketMQ. For more information, see November 1, 2021: The endpoints of Message Queue for Apache RocketMQ are unavailable and Realtime Compute for Apache Flink deployments need to be updated to adapt to the change .
  • Internal Message Queue for Apache RocketMQ does not support cross-region access. For example, if your Realtime Compute for Apache Flink service is located in the China (Hangzhou) region but your Message Queue for Apache RocketMQ service is located in the China (Shanghai) region, Realtime Compute for Apache Flink cannot access Message Queue for Apache RocketMQ.
  • The network security policies of Alibaba Cloud dynamically change. As a result, connection issues may occur when Realtime Compute for Apache Flink connects to the public Message Queue for Apache RocketMQ service. We recommend that you use the internal Message Queue for Apache RocketMQ service.
accessId AccessKey ID Yes N/A.
accessKey AccessKey Secret Yes N/A.
consumerGroup The name of a consumer group. Yes N/A.
pullIntervalMs The interval at which messages are pulled. No Unit: milliseconds.
startTime The time when Realtime Compute for Apache Flink starts to consume messages. No N/A.
startMessageOffset The start offset to consume messages. No This parameter is optional. If you configure this parameter, messages are consumed from the point of time that is specified by this parameter.
tag The subscription tag. No N/A.
lineDelimiter A row delimiter that is used to parse message blocks. No Default value: \n.
fieldDelimiter A field delimiter. No Default value: \u0001. \u0001 is the field delimiter in read-only mode, and ^A is the field delimiter in edit mode. \u0001 is invisible in read-only mode.
encoding The encoding format. No Default value: utf-8.
lengthCheck The policy that is used to check the number of fields parsed from a row of data. No Default value: NONE.
  • If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.
  • If the number of fields that are parsed from a row of data is less than the specified number of fields, this row of data is skipped.
Other valid values are SKIP, EXCEPTION, and PAD.
  • SKIP: If the number of fields that are parsed from a row of data is different from the specified number of fields, this row of data is skipped.
  • EXCEPTION: If the number of fields that are parsed from a row of data is different from the specified number of fields, an exception is reported.
  • PAD: Data is padded from left to right based on the order of specified fields.
    • If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.
    • If the number of fields that are parsed from a row of data is less than the specified number of fields, the values of the missing fields are padded with null.
columnErrorDebug Specifies whether to enable debugging. No Default value: FALSE. If you configure this parameter to TRUE, a log entry is displayed when a parsing exception occurs.
instanceID The ID of a Message Queue for Apache RocketMQ instance. No If the Message Queue for Apache RocketMQ instance does not have separate namespaces, you do not need to configure this parameter. If the Message Queue for Apache RocketMQ instance has separate namespaces, you must configure this parameter.

Data type mapping

Data type of Message Queue for Apache RocketMQ Data type of Realtime Compute for Apache Flink
STRING VARCHAR