This topic describes the data types and parameters that are supported by MetaQ Reader. For example, you can configure field mappings and connections for MetaQ Reader. This topic also provides an example to describe how to configure MetaQ Reader.
Background information
Message Queue is a professional message-oriented middleware that is developed by Alibaba Group. Based on technologies for building a highly available distributed cluster, Message Queue provides a complete set of cloud messaging services. The services include message subscription and publishing, message tracing, scheduled and delayed messages, resource statistics, and monitoring and alerting. Message Queue provides asynchronous decoupling for distributed application systems and is suitable for Internet applications with large amounts of messages and high throughput. It is one of the core services that are used by Alibaba Group to support Double 11.
MetaQ Reader reads real-time data from Message Queue by using Message Queue SDK for Java. Then, it converts the data to the transfer protocol of Data Integration and sends the transfer protocol to a writer.
How it works
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sdk</artifactId>
<version>1.3.1</version>
</dependency>
Data type
Data Integration data type | Message Queue data type |
---|---|
STRING | STRING |
Parameters
Parameter | Description | Required |
---|---|---|
accessId | The AccessKey ID that you can use to connect to Message Queue. | Yes |
accessKey | The AccessKey secret that you can use to connect to Message Queue. | Yes |
consumerId | A consumer, also known as a message subscriber, receives and consumes messages.
The consumer ID is the identifier of a type of consumers. The consumers with the same consumer ID generally receive and consume a type of messages, and use the same consumption logic. |
Yes |
topicName | The topic of the messages to be consumed. A topic is used to classify messages. It is the primary classifier. | Yes |
subExpression | The sub-topic of messages. | Yes |
onsChannel | Used to authenticate Message Queue. | Yes |
unitName | The destination unit that receives messages. Valid values:
|
No |
instanceName | The name of the consumer instance. | No |
domainName | The endpoint that you can use to connect to Message Queue. | Yes |
contentType | The type of the message. Valid values: singlestringcolumn, text, and json. | Yes |
beginOffset | The offset where the sync node starts to read data. Valid values: begin and lastRead. | Yes |
nullCurrentOffset | The offset where the sync node starts to read data when the last offset is null. Valid values: begin and current. | Yes |
fieldDelimiter | The column delimiter that is used to separate message strings, such as commas (,). Control characters are supported. Example: \u0001. | Yes |
column | The columns to be synchronized. | Yes |
beginDateTime | The start timestamp to consume data. This parameter defines the left boundary of the
left-closed, right-open interval.
The value of the beginDateTime parameter is a time string in the yyyyMMddHHmmss format. It can be used in combination with the scheduling time parameters in DataWorks. |
No
Note The beginDateTime and endDateTime parameters must be used in pairs.
|
endDateTime | The end timestamp to consume data. This parameter defines the right boundary of the
left-closed, right-open interval.
The value of the endDateTime parameter is a time string in the yyyyMMddHHmmss format. It can be used in combination with the scheduling time parameters in DataWorks. |
Feature description
{
"job": {
"content": [
{
"reader": {
"name": "metaqreader",
"parameter": {
"accessId": "<yourAccessKeyId>",
"accessKey": "<yourAccessKeySecret>",
"consumerId": "Test01",
"topicName": "test",
"subExpression": "*",
"onsChannel": "ALIYUN",
"domainName": "***.aliyun.com",
"contentType": "singlestringcolumn",
"beginOffset": "lastRead",
"nullCurrentOffset": "begin",
"fieldDelimiter": ",",
"column": [
"col0"
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}