MetaQ Reader is a DataWorks Data Integration plugin that reads data from Message Queue by subscribing to real-time messages using the Java software development kit (SDK).
Supported versions
MetaQ Reader uses the following Java SDK versions:
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sdk</artifactId>
<version>1.3.1</version>
</dependency>Limits
MetaQ Reader is only available in the code editor — the GUI mode is not supported.
Supported resource groups: Serverless resource groups (recommended) and exclusive resource groups for Data Integration.
Data types
MetaQ Reader supports the STRING field type for offline reads.
| Field type | Offline read (MetaQ Reader) |
|---|---|
| STRING | Supported |
Type mapping between Data Integration and Message Queue:
| Data Integration data type | Message Queue data type |
|---|---|
| STRING | STRING |
Configure a synchronization task
Configure the synchronization task using the code editor. For the step-by-step procedure, see Configure a task in the code editor.
For the complete parameter reference and a ready-to-use code sample, see Appendix: Code and parameters.
Appendix: Code and parameters
All parameters go under reader.parameter in the job script.
Code sample
{
"job": {
"content": [
{
"reader": {
"name": "metaqreader",
"parameter": {
"accessId": "<your-access-key-id>",
"accessKey": "<your-access-key-secret>",
"consumerId": "Test01",
"topicName": "test",
"subExpression": "*",
"onsChannel": "ALIYUN",
"domainName": "<your-message-queue-endpoint>",
"contentType": "singlestringcolumn",
"beginOffset": "lastRead",
"nullCurrentOffset": "begin",
"fieldDelimiter": ",",
"column": ["col0"]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}Replace the following placeholders with actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-access-key-id> | AccessKey ID for authentication | LTAI5tXxx |
<your-access-key-secret> | AccessKey Secret for authentication | xXxXxXx |
<your-message-queue-endpoint> | Endpoint of your Message Queue instance | xxx.aliyun.com |
Parameter reference
Required parameters
| Parameter | Description |
|---|---|
accessId | AccessKey ID for authentication. Get your AccessKey pair from the Alibaba Cloud console. |
accessKey | AccessKey Secret for authentication. |
consumerId | Consumer ID that identifies the consumer type. Consumers sharing the same ID receive the same message type and apply the same consumption logic. |
topicName | Topic name. A topic is the primary classification for messages. |
subExpression | Tag filter expression for the message subtopic. Use * to subscribe to all tags. |
onsChannel | Authentication channel for Message Queue. |
domainName | Endpoint of your Message Queue instance. |
contentType | Message format. Valid values: singlestringcolumn (STRING), text (plain text), json (JSON). |
nullCurrentOffset | Starting position when no previous offset exists. Valid values: begin (earliest available message) or current (current position). |
fieldDelimiter | Field delimiter for messages in singlestringcolumn mode, such as a comma (,). Control characters like \u0001 are supported. Only applies when contentType is singlestringcolumn. |
column | List of fields to read. |
Optional parameters
| Parameter | Description |
|---|---|
beginOffset | Starting offset for the task. Valid values: begin (earliest available message) or lastRead (resume from the last read offset). When set to lastRead and no previous offset exists, the behavior is controlled by nullCurrentOffset. |
unitName | Destination unit that receives messages. Common values: sh (center), unsz (China, Shenzhen), us (United States), en-us (Europe), rg-ru (Russia), zbyk (Youku, Zhangjiakou), unzbyun (Zhangbei Cloud), unshyun (Alibaba Cloud, Shanghai), lazada-sg, lazada-my, lazada-vn, lazada-ph, lazada-th, lazada-id. |
instanceName | Name of the consumer instance. |
beginDateTime | Start time for data consumption in yyyyMMddHHmmss format. Defines the left boundary of a left-closed, right-open time range. Must be used together with endDateTime. Supports DataWorks scheduling parameters. |
endDateTime | End time for data consumption in yyyyMMddHHmmss format. Defines the right boundary of a left-closed, right-open time range. Must be used together with beginDateTime. Supports DataWorks scheduling parameters. |