All Products
Search
Document Center

DataWorks:MetaQ

Last Updated:Mar 26, 2026

MetaQ Reader is a DataWorks Data Integration plugin that reads data from Message Queue by subscribing to real-time messages using the Java software development kit (SDK).

Supported versions

MetaQ Reader uses the following Java SDK versions:

<dependency>
    <groupId>com.taobao.metaq.final</groupId>
    <artifactId>metaq-client</artifactId>
    <version>4.0.1</version>
</dependency>
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-sdk</artifactId>
    <version>1.3.1</version>
</dependency>

Limits

Data types

MetaQ Reader supports the STRING field type for offline reads.

Field typeOffline read (MetaQ Reader)
STRINGSupported

Type mapping between Data Integration and Message Queue:

Data Integration data typeMessage Queue data type
STRINGSTRING

Configure a synchronization task

Configure the synchronization task using the code editor. For the step-by-step procedure, see Configure a task in the code editor.

For the complete parameter reference and a ready-to-use code sample, see Appendix: Code and parameters.

Appendix: Code and parameters

All parameters go under reader.parameter in the job script.

Code sample

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "metaqreader",
          "parameter": {
            "accessId": "<your-access-key-id>",
            "accessKey": "<your-access-key-secret>",
            "consumerId": "Test01",
            "topicName": "test",
            "subExpression": "*",
            "onsChannel": "ALIYUN",
            "domainName": "<your-message-queue-endpoint>",
            "contentType": "singlestringcolumn",
            "beginOffset": "lastRead",
            "nullCurrentOffset": "begin",
            "fieldDelimiter": ",",
            "column": ["col0"]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": false
          }
        }
      }
    ]
  }
}

Replace the following placeholders with actual values:

PlaceholderDescriptionExample
<your-access-key-id>AccessKey ID for authenticationLTAI5tXxx
<your-access-key-secret>AccessKey Secret for authenticationxXxXxXx
<your-message-queue-endpoint>Endpoint of your Message Queue instancexxx.aliyun.com

Parameter reference

Required parameters

ParameterDescription
accessIdAccessKey ID for authentication. Get your AccessKey pair from the Alibaba Cloud console.
accessKeyAccessKey Secret for authentication.
consumerIdConsumer ID that identifies the consumer type. Consumers sharing the same ID receive the same message type and apply the same consumption logic.
topicNameTopic name. A topic is the primary classification for messages.
subExpressionTag filter expression for the message subtopic. Use * to subscribe to all tags.
onsChannelAuthentication channel for Message Queue.
domainNameEndpoint of your Message Queue instance.
contentTypeMessage format. Valid values: singlestringcolumn (STRING), text (plain text), json (JSON).
nullCurrentOffsetStarting position when no previous offset exists. Valid values: begin (earliest available message) or current (current position).
fieldDelimiterField delimiter for messages in singlestringcolumn mode, such as a comma (,). Control characters like \u0001 are supported. Only applies when contentType is singlestringcolumn.
columnList of fields to read.

Optional parameters

ParameterDescription
beginOffsetStarting offset for the task. Valid values: begin (earliest available message) or lastRead (resume from the last read offset). When set to lastRead and no previous offset exists, the behavior is controlled by nullCurrentOffset.
unitNameDestination unit that receives messages. Common values: sh (center), unsz (China, Shenzhen), us (United States), en-us (Europe), rg-ru (Russia), zbyk (Youku, Zhangjiakou), unzbyun (Zhangbei Cloud), unshyun (Alibaba Cloud, Shanghai), lazada-sg, lazada-my, lazada-vn, lazada-ph, lazada-th, lazada-id.
instanceNameName of the consumer instance.
beginDateTimeStart time for data consumption in yyyyMMddHHmmss format. Defines the left boundary of a left-closed, right-open time range. Must be used together with endDateTime. Supports DataWorks scheduling parameters.
endDateTimeEnd time for data consumption in yyyyMMddHHmmss format. Defines the right boundary of a left-closed, right-open time range. Must be used together with beginDateTime. Supports DataWorks scheduling parameters.