This topic describes the data types and parameters that are supported by MetaQ Reader and how to configure MetaQ Reader by using the code editor.

Notice MetaQ Reader supports only exclusive resource groups for Data Integration, but not the shared resource group or custom resource groups for Data Integration. For more information, see Create and use an exclusive resource group for Data Integration, Use a shared resource group, and Create a custom resource group for Data Integration.

Background information

Message Queue is a professional message-oriented middleware that is developed by Alibaba Group. Message Queue provides a complete set of cloud messaging services based on the technologies that are used for building a highly available and distributed cluster. The services include message subscription and publishing, message tracing, scheduled and delayed messages, resource statistics, and monitoring and alerting. Message Queue provides asynchronous decoupling for distributed application systems and is suitable for Internet applications with large amounts of messages and high throughput. It is one of the core services that are used by Alibaba Group to support the promotional events of Double 11.

MetaQ Reader reads real-time data from Message Queue by using Message Queue SDK for Java. Then, MetaQ Reader converts the data types of the data to those supported by Data Integration and sends the converted data to a writer.

How it works

MetaQ Reader subscribes to real-time data from Message Queue by using Message Queue SDK for Java of one of the following versions:
<dependency>
            <groupId>com.taobao.metaq.final</groupId>
            <artifactId>metaq-client</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-sdk</artifactId>
            <version>1.3.1</version>
        </dependency>

Data types

The following table lists the mapping between Data Integration data type and Message Queue data type.
Data Integration data type Message Queue data type
STRING STRING

Parameters

Parameter Description Required
accessId The AccessKey ID that you use to access Message Queue. Yes
accessKey The AccessKey secret that you use to access Message Queue. Yes
consumerId The consumer ID. A consumer is also known as a message subscriber which receives and consumes messages.

The consumer ID is the identifier of a type of consumer. In most cases, the consumers that have the same consumer ID receive and consume the same type of message and use the same consumption logic.

Yes
topicName The topic of the messages that you want to consume. A topic is used to classify messages. It is the primary classifier. Yes
subExpression The subtopic of the messages. Yes
onsChannel The channel that is used for authentication when MetaQ Reader connects to Message Queue. Yes
unitName The destination unit that receives messages. Valid values:
  • sh: center
  • unsz: unit in the China (Shenzhen) region
  • us: United States of America
  • en-us: Europe
  • rg-ru: Russia
  • zbyk: Youku in the China (Zhangjiakou) region
  • unzbyun: Alibaba Cloud in the China (Zhangjiakou) region
  • unshyun: Alibaba Cloud in the China (Shanghai) region
  • lazada-sg: Lazada in Singapore
  • lazada-my: Lazada in Malaysia
  • lazada-vn: Lazada in Vietnam
  • lazada-ph: Lazada in the Philippines
  • lazada-th: Lazada in Thailand
  • lazada-id: Lazada in Indonesia
No
instanceName The name of the consumer instance. No
domainName The endpoint that you use to connect to Message Queue. Yes
contentType The type of the messages. Valid values: singlestringcolumn, text, and json. Yes
beginOffset The offset from which MetaQ Reader starts to read data. Valid values: begin and lastRead. Yes
nullCurrentOffset The offset from which MetaQ Reader starts to read data if the last offset is null. Valid values: begin and current. Yes
fieldDelimiter The column delimiter that is used to separate message strings, such as commas (,). Control characters are supported. Example: \u0001. Yes
column The names of the fields from which you want to read data in the messages. Yes
beginDateTime The start time of data consumption. This parameter specifies the left boundary of a left-closed, right-open interval.

The value of the beginDateTime parameter is a time string in the yyyyMMddHHmmss format. This parameter can be used together with the scheduling time parameters in DataWorks.

No
Note The beginDateTime and endDateTime parameters must be used in pairs.
endDateTime The end time of data consumption. This parameter specifies the right boundary of a left-closed, right-open interval.

The value of the endDateTime parameter is a time string in the yyyyMMddHHmmss format. This parameter can be used together with the scheduling time parameters in DataWorks.

Configure MetaQ Reader by using the code editor

In the following code, a synchronization node is configured to read data from Message Queue. For more information about how to configure a synchronization node by using the code editor, see Create a sync node by using the code editor.
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "metaqreader",
                    "parameter": {
                        "accessId": "<yourAccessKeyId>",
                        "accessKey": "<yourAccessKeySecret>",
                        "consumerId": "Test01",
                        "topicName": "test",
                        "subExpression": "*",
                        "onsChannel": "ALIYUN",
                        "domainName": "***.aliyun.com",
                        "contentType": "singlestringcolumn",
                        "beginOffset": "lastRead",
                        "nullCurrentOffset": "begin",
                        "fieldDelimiter": ",",
                        "column": [
                            "col0"
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                    }
                }
            }
        ]
    }
}