This topic provides the DDL syntax that is used to create a Message Queue for Apache RocketMQ result table, describes the parameters in the WITH clause, and provides sample code.

Note A Message Queue for Apache RocketMQ connector can be used to store data of a result table for streaming jobs and batch jobs.

What is Message Queue for Apache RocketMQ?

Message Queue for Apache RocketMQ is a distributed messaging middleware that is developed by Alibaba Cloud based on Apache RocketMQ. It features low latency, high concurrency, high availability, and high reliability. Message Queue for Apache RocketMQ provides the asynchronous decoupling and load shifting features for distributed application systems. It also supports features for Internet applications, including massive message accumulation, high throughput, and reliable retry. Flink can use the streaming data that is stored in Message Queue for Apache RocketMQ as output data.

Prerequisites

Resources are created in the Message Queue for Apache RocketMQ console. For more information, see Create resources.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Message Queue for Apache RocketMQ connectors.

DDL syntax

create table mq_sink(
  x varchar,
  y varchar,
  z varchar
) with (
  'connector'='mq',
  'topic'='<yourTopicName>',
  'endpoint'='<yourEndpoint>',
  'accessId'='<yourAccessId>',
  'accessKey'='<yourAccessSecret>'
);
Note Message Queue for Apache RocketMQ is a messaging middleware that stores unstructured data. You do not need to define schemas for Message Queue for Apache RocketMQ tables. The schemas are determined by the service layer. Flink supports messages of Message Queue for Apache RocketMQ only in the CSV and binary formats.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to mq.
topic The name of the topic. Yes N/A.
endpoint The endpoint of Message Queue for Apache RocketMQ. Yes Message Queue for Apache RocketMQ supports the following types of endpoints:
  • For jobs that run on VVR 3.0.1 or later, use Transmission Control Protocol (TCP) endpoints. For more information, see Announcement on the settings of internal TCP endpoints. You can use one of the following methods to obtain the endpoints:
    • Internal endpoints of Message Queue for Apache RocketMQ that resides in the classic network or a virtual private cloud (VPC) of Alibaba Cloud: Log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internal Access in the TCP Endpoint section.
    • Public endpoint of Message Queue for Apache RocketMQ: Log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internet Access in the TCP Endpoint section.
  • For jobs that run on VVR of a version earlier than 3.0.1, use the following endpoints:
    • Internal endpoints of Message Queue for Apache RocketMQ that resides in the classic network or a VPC of Alibaba Cloud:
      • China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), and China (Hong Kong): onsaddr-internal.aliyun.com:8080.
      • Singapore (Singapore): ap-southeastaddr-internal.aliyun.com:8080.
      • UAE (Dubai): ons-me-east-1-internal.aliyuncs.com:8080.
      • India (Mumbai): ons-ap-south-1-internal.aliyuncs.com:8080.
      • Malaysia (Kuala Lumpur): ons-ap-southeast-3-internal.aliyun.com:8080.
    • Public endpoint: onsaddr-internet.aliyun.com:80.
Notice
  • If you have used the Message Queue for Apache RocketMQ connector in VVR of a version earlier than 3.0.1, you must update your Realtime Compute for Apache Flink job to VVR 3.0.1 or later and change the value of the endpoint parameter to the new endpoint of Message Queue for Apache RocketMQ. This helps prevent stability risks or unavailability issues caused by the old endpoint of Message Queue for Apache RocketMQ. For more information, see Announcement on the upgrade of Realtime Compute for Apache Flink jobs due to the change of endpoints of Message Queue for Apache RocketMQ.
  • Internal Message Queue for Apache RocketMQ service does not support cross-origin access. For example, if your Realtime Compute for Apache Flink service is located in the China (Hangzhou) region but your Message Queue for Apache RocketMQ service is located in the China (Shanghai) region, Realtime Compute for Apache Flink cannot access Message Queue for Apache RocketMQ.
  • Due to changes in the network security policies of Alibaba Cloud, connection issues may occur when Realtime Compute for Apache Flink connects to the public Message Queue for Apache RocketMQ service. We recommend that you use the internal Message Queue for Apache RocketMQ service. If an exception occurs when you use the public Message Queue for Apache RocketMQ service,submit a ticket.
accessId The AccessKey ID that is used to access the Message Queue for Apache RocketMQ instance. Yes N/A.
accessKey The AccessKey secret that is used to access the Message Queue for Apache RocketMQ instance. Yes N/A.
producerGroup The name of the producer group. Yes N/A.
tag The message tag. No This parameter is empty by default.
nameServerSubgroup The name server group. No
  • Group name for the internal Message Queue for Apache RocketMQ service that resides in the classic network or a VPC of Alibaba Cloud: nsaddr4client-internal.
  • Group name for the public Message Queue for Apache RocketMQ service: nsaddr4client-internet
Note Only VVR versions from 2.1.1 to 3.0.0 support this parameter. VVR 3.0.1 and later do not support this parameter.
encoding The encoding type. No Default value: utf-8.
retryTimes The number of retries for writing data to the table. No Default value: 10.
sleepTimeMs The retry interval. No Default value: 1000. Unit: milliseconds.
instanceID The ID of a Message Queue for Apache RocketMQ instance. No
  • If the Message Queue for Apache RocketMQ instance does not have a separate namespace, the instanceID parameter cannot be used.
  • If the Message Queue for Apache RocketMQ instance has a separate namespace, the instanceID parameter is required.

Sample code

  • CSV format
    create table mq_sink (
      id INTEGER,
      len BIGINT,
      content VARCHAR
    ) WITH (
      'connector'='mq',
      'endpoint'='<yourEndpoint>',
      'accessId'='<yourAccessId>',
      'accessKey'='<yourAccessSecret>',
      'topic'='<yourTopicName>',
      'producerGroup'='<yourGroupName>',
      'tag'='<yourTagName>',
      'encoding'='utf-8',
      'retryTimes'='5',
      'sleepTimeMs'='500'
    );
  • Binary format
    CREATE TEMPORARY TABLE datagen_source (
      commodity VARCHAR
    ) WITH ( 
      'connector'='datagen' 
    );
    
    CREATE TEMPORARY TABLE mq_sink (
      mess VARBINARY
    ) WITH (
      'connector'='mq',
      'endpoint'='<yourEndpoint>',
      'accessId'='<yourAccessId>',
      'accessKey'='<yourAccessSecret>',
      'topic'='<yourTopicName>',
      'producerGroup'='<yourGroupName>'
    );
    
    INSERT INTO mq_sink
    SELECT 
         CAST(SUBSTRING(commodity,0,5) AS VARBINARY) AS mess   
    FROM datagen_source;