DataWorks Data Integration provides MetaQ Reader for you to read data from Message Queue. This topic describes the capabilities of synchronizing data from MetaQ data sources.
Supported versions
MetaQ Reader subscribes to real-time messages in MetaQ using the Java software development kit (SDK) for Message Queue. The following Java SDK versions are supported.
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sdk</artifactId>
<version>1.3.1</version>
</dependency>Limits
You can use the MetaQ Reader plugin to read data from Message Queue only in the code editor.
MetaQ Reader supports Serverless resource groups (recommended) and exclusive resource groups for Data Integration.
Data types
The following table lists the supported field types.
Field type | Offline read (MetaQ Reader) |
STRING | Support |
The following transformations are available for the MetaQ type in MetaQ Reader.
Data Integration data type | Message Queue data type |
STRING | STRING |
Develop a data synchronization node
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
For the configuration procedure, see Configure a task in the code editor.
For a list of all parameters and a code sample for the code editor, see Appendix: MetaQ code sample and parameter description.
Appendix: Code and parameters
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization task by using the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader code sample
{
"job": {
"content": [
{
"reader": {
"name": "metaqreader",
"parameter": {
"accessId": "<yourAccessKeyId>",
"accessKey": "<yourAccessKeySecret>",
"consumerId": "Test01",
"topicName": "test",
"subExpression": "*",
"onsChannel": "ALIYUN",
"domainName": "***.aliyun.com",
"contentType": "singlestringcolumn",
"beginOffset": "lastRead",
"nullCurrentOffset": "begin",
"fieldDelimiter": ",",
"column": [
"col0"
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}Reader script parameters
Parameter | Description | Required |
accessId | The AccessKey pair of Message Queue. An AccessKey pair is used for identity authentication. | Yes |
accessKey | Yes | |
consumerId | A consumer, also known as a subscriber, receives and consumes messages. The consumer ID is the identifier of a type of consumer. In most cases, the consumers that have the same consumer ID receive and consume the same type of message and use the same consumption logic. | Yes |
topicName | The message topic. A topic is a primary message type used to classify messages. | Yes |
subExpression | The message subtopic. | Yes |
onsChannel | Used for Message Queue authentication. | Yes |
unitName | The destination unit that receives messages. Common units are listed below:
| No |
instanceName | The name of the consumer instance. | No |
domainName | The endpoint of Message Queue. | Yes |
contentType | The message type. Supported types are singlestringcolumn for STRING messages, text for text messages, and json for JSON messages. | Yes |
beginOffset | The offset from which the task starts to read data. Valid values: begin (from the earliest offset) and lastRead (from the last read offset). | No |
nullCurrentOffset | The position from which to start reading data if the last offset is empty. Valid values: begin (from the earliest offset) and current (from the current offset). | Yes |
fieldDelimiter | The column delimiter for message strings in separator mode, such as a comma (,). Control characters, such as \u0001, are supported. | Yes |
column | The list of fields to read. | Yes |
beginDateTime | The start time for data consumption. This parameter specifies the left boundary of a time range. The time range is a left-closed, right-open interval. The value of the beginDateTime parameter is a time string in the yyyyMMddHHmmss format. This parameter can be used together with the scheduling parameters in DataWorks. | No Note The beginDateTime and endDateTime parameters must be used together. |
endDateTime | The end time for data consumption. This parameter specifies the right boundary of a time range. The time range is a left-closed, right-open interval. The value of the endDateTime parameter is a time string in the yyyyMMddHHmmss format. This parameter can be used together with the scheduling parameters in DataWorks. |