This topic describes how to create a Message Queue for Apache RocketMQ result table in Realtime Compute for Apache Flink. This topic also describes the parameters in the WITH clause used when you create a Message Queue for Apache RocketMQ result table.

Notice
  • This topic applies only to Blink 1.4.5 and later.
  • If you need to use Message Queue for Apache RocketMQ that has separate namespaces, use Blink 3.X.

Introduction to Message Queue for Apache RocketMQ

Message Queue for Apache RocketMQ is a professional message middleware that is developed by Alibaba Cloud for commercial use. It is a core product for the enterprise-level Internet architecture. Based on the high-availability distributed cluster technology, Message Queue for Apache RocketMQ provides comprehensive cloud messaging services, including message publishing and subscription, message tracing, resource statistics, message scheduling or delaying, monitoring, and alerts.

CSV format

You can specify Message Queue for Apache RocketMQ tables as result tables for Realtime Compute for Apache Flink to process streaming data. In the following sample code, the DDL statement creates a Message Queue for Apache RocketMQ result table to store streaming data in the CSV format:
CREATE TABLE stream_test_hotline_agent (
id INTEGER,
len BIGINT,
content VARCHAR
) WITH (
type='mq',
endpoint='<yourEndpoint>',
accessID='<yourAccessId>',
accessKey='<yourAccessSecret>',
topic='<yourTopicName>',
producerGroup='<yourGroupName>',
tag='<yourTagName>',
encoding='utf-8',
fieldDelimiter=',',
retryTimes='5',
sleepTimeMs='500'
);

Binary format

You can specify Message Queue for Apache RocketMQ tables as result tables for Realtime Compute for Apache Flink to process streaming data. In the following sample code, the DDL statement creates a Message Queue for Apache RocketMQ result table to store streaming data in the binary format:
CREATE TABLE source_table (
  commodity VARCHAR
)WITH(
  type='random'
);

CREATE TABLE result_table (
  mess VARBINARY
) WITH (
  type = 'mq',
  endpoint='<yourEndpoint>',
  accessID='<yourAccessId>',
  accessKey='<yourAccessSecret>',
  topic='<yourTopicName>',
  producerGroup='<yourGroupName>'
);

INSERT INTO result_table
SELECT 
CAST(SUBSTRING(commodity,0,5) AS VARBINARY) AS mess   
FROM source_table;
Note The cast(varchar as varbinary) method is supported only in Blink 2.0 or later. If the Blink version is earlier than 2.0, update the Blink version first. For more information, see Manage Blink versions of a Realtime Compute for Apache Flink cluster deployed in exclusive mode.

Parameters in the WITH clause

Parameter Description Remarks
type The type of the result table. Set the value to mq.
topic The name of the Message Queue for Apache RocketMQ topic to which data is written. N/A.
endpoint The endpoint of Message Queue for Apache RocketMQ. Two types of Message Queue for Apache RocketMQ services are provided: internal Message Queue for Apache RocketMQ and public Message Queue for Apache RocketMQ. Select an endpoint based on the type of Message Queue for Apache RocketMQ that you purchase.
  • For jobs that run on Blink 3.7.10 or later, use Transmission Control Protocol (TCP) endpoints. For more information, see Announcement on the settings of internal TCP endpoints. You can use one of the following methods to obtain the endpoints:
    • Internal endpoints of Message Queue for Apache RocketMQ that resides in the classic network or a virtual private cloud (VPC) of Alibaba Cloud: Log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. In the TCP Endpoint section, you can view the endpoint that corresponds to Internal Access.
    • Public endpoint of Message Queue for Apache RocketMQ: Log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. In the TCP Endpoint section, you can view the endpoint that corresponds to Internet Access.
  • For jobs that run on Blink of a version earlier than 3.7.10, use the following endpoints:
    • Internal endpoints of Message Queue for Apache RocketMQ that resides in the classic network or a VPC of Alibaba Cloud:
      • China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), and China (Hong Kong): onsaddr-internal.aliyun.com:8080.
      • Singapore (Singapore): ap-southeastaddr-internal.aliyun.com:8080.
      • UAE (Dubai): ons-me-east-1-internal.aliyuncs.com:8080.
      • India (Mumbai): ons-ap-south-1-internal.aliyuncs.com:8080.
      • Malaysia (Kuala Lumpur): ons-ap-southeast-3-internal.aliyun.com:8080.
    • Public endpoint of Message Queue for Apache RocketMQ: onsaddr-internet.aliyun.com:80.
Notice
  • If you use the Message Queue for Apache RocketMQ connector in Blink of a version earlier than 3.7.10, you must update your Realtime Compute for Apache Flink job to Blink 3.7.10 or later and change the value of the endpoint parameter to the new endpoint of Message Queue for Apache RocketMQ. This reduces the risks of instability or unavailability that are caused by the old endpoint of Message Queue for Apache RocketMQ. For more information, see Announcement on the upgrade of Realtime Compute for Apache Flink jobs due to the change of endpoints of Message Queue for Apache RocketMQ.
  • Internal Message Queue for Apache RocketMQ does not support cross-region access. For example, if your Realtime Compute for Apache Flink service resides in the China (Hangzhou) region but your Message Queue for Apache RocketMQ service resides in the China (Shanghai) region, Realtime Compute for Apache Flink cannot access this Message Queue for Apache RocketMQ service.
  • By default, Realtime Compute for Apache Flink clusters in exclusive mode cannot access the Internet. If you want to access the Internet, configure a NAT gateway.
  • The network security policies of Alibaba Cloud dynamically change. As a result, network connection issues may occur when Realtime Compute for Apache Flink connects to the public Message Queue for Apache RocketMQ service. We recommend that you use the internal Message Queue for Apache RocketMQ service. If an exception occurs when you use the public Message Queue for Apache RocketMQ service, submit a ticket.
accessID AccessKey ID N/A.
accessKey AccessKey Secret N/A.
producerGroup Specifies the name of the producer group to which messages are written. N/A.
tag The message tag. Optional. This parameter is empty by default.
fieldDelimiter The field delimiter. Optional. Default value: \u0001. The delimiter varies based on the following modes:
  • In read-only mode, the \u0001 delimiter is used. \u0001 is invisible in read-only mode.
  • In edit mode, the ^A delimiter is used.
encoding The encoding format. Optional. Default value: utf-8.
retryTimes The number of retries for writing data to the table. Optional. Default value: 10.
sleepTimeMs The retry interval. Optional. Default value: 1000. Unit: milliseconds.
instanceID The ID of a Message Queue for Apache RocketMQ instance.
  • If the Message Queue for Apache RocketMQ instance does not have a separate namespace, the instanceID parameter cannot be used.
  • If the Message Queue for Apache RocketMQ instance has a separate namespace, the instanceID parameter is required.