All Products
Search
Document Center

DataWorks:DataHub data source

Last Updated:Apr 23, 2026

The DataHub data source provides a bidirectional channel for reading data from and writing data to DataHub, enabling efficient processing of large-scale data. This topic describes the data synchronization capabilities that DataWorks provides for DataHub.

Supported versions

  • DataHub Reader uses the DataHub SDK for Java to read data from DataHub. The following code shows the SDK version.

    <dependency>
        <groupId>com.aliyun.DataHub</groupId>
        <artifactId>aliyun-sdk-DataHub</artifactId>
        <version>2.9.1</version>
    </dependency>
  • DataHub Writer uses the DataHub SDK for Java to write data to DataHub. The following code shows the SDK version.

    <dependency>
        <groupId>com.aliyun.datahub</groupId>
        <artifactId>aliyun-sdk-datahub</artifactId>
        <version>2.5.1</version>
    </dependency>

Limitations

Batch read/write

The STRING data type supports only UTF-8 encoding. A single STRING column cannot exceed 1 MB.

Real-time read/write

  • Real-time synchronization tasks support serverless resource groups.

  • When you synchronize data to DataHub in real time, data is sharded based on the hash value of the source data. Records with the same hash value are synchronized to the same shard.

Real-time full-database write

When you run the task, a batch synchronization task first writes the full data to DataHub. Then, a real-time synchronization task starts, synchronizing incremental data from the source to the destination. Data is written based on the following rules:

  • Data can be written only to DataHub topics of the TUPLE type. For more information about the TUPLE data type, see Data types.

  • When you synchronize data to DataHub in real time, five additional fields are appended to the source table fields. You can also add other fields when you configure the task. For more information about the final message format sent to DataHub, see Appendix: DataHub message formats.

Supported field types

When synchronizing data to DataHub, values are mapped to the corresponding field types. DataHub supports only the BIGINT, STRING, BOOLEAN, DOUBLE, TIMESTAMP, and DECIMAL data types.

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data source management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Configure a single-table batch synchronization task

Configure a single-table real-time synchronization task

For the procedure, see Configure a real-time synchronization task.

Note

For information about operations supported by different DataHub data types, sharding strategies, data formats, and related message examples, see Appendix: DataHub message formats.

Configure a real-time full-database synchronization task

For the procedure, see Configure a real-time full-database synchronization task.

FAQ

Data limit write failure

Appendix: Script demos and parameter description

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script demo

{
    "type":"job",
    "version":"2.0",// The version number.
    "steps":[
        {
         "job": {
           "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx" // The endpoint of DataHub.
                        "accessId": "xxx", // The AccessKey ID used to access DataHub.
                        "accessKey": "xxx", // The AccessKey secret used to access DataHub.
                        "project": "xxx", // The name of the DataHub project.
                        "topic": "xxx" // The name of the DataHub topic.
                        "batchSize": 1000, // The number of records to read at a time.
                        "beginDateTime": "20180910111214", // The start time for data consumption.
                        "endDateTime": "20180910111614", // The end time for data consumption.
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                                  ]
                                }
                           },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                                 }
                            }
             }
           ]
         }
       }
     ],
    "setting":{
        "errorLimit":{
            "record":"0"// The error count.
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. If throttle is set to false, the mbps parameter does not take effect, which means throttling is disabled. If throttle is set to true, throttling is enabled.
            "concurrent":1,// The concurrency.
            "mbps":"12"// The throttling rate. 1 mbps = 1 MB/s.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader script parameters

Parameter

Description

Required

endpoint

The endpoint of DataHub.

Yes

accessId

The accessId used to access DataHub.

Yes

accessKey

The accessKey used to access DataHub.

Yes

project

The name of the DataHub project. A project is the basic resource management unit in DataHub and is used for resource isolation and access control.

Yes

topic

The name of the DataHub topic.

Yes

batchSize

The number of records to read at a time. Default value: 1,024.

No

beginDateTime

The start time for data consumption. This parameter specifies the left boundary (inclusive) of the time range and must be a time string in the yyyyMMddHHmmss format. To implement incremental synchronization, you can use this parameter together with DataWorks scheduling parameters. For example, set the parameter name of the node scheduling parameter to bizdate and the parameter value to $[yyyymmdd-1]. Then, set beginDateTime to ${bizdate}000000, which indicates the start time for data consumption is 00:00:00 of the previous day.

Note

beginDateTime and endDateTime must be used together.

Yes

endDateTime

The end time for data consumption. This parameter specifies the right boundary (exclusive) of the time range and must be a time string in the yyyyMMddHHmmss format. To implement incremental synchronization, you can use this parameter together with DataWorks scheduling parameters. For example, set the parameter name of the node scheduling parameter to bizdate and the parameter value to $[yyyymmdd-1]. Then, set endDateTime to ${bizdate}235959, which indicates the end time for data consumption is 23:59:59 of the previous day.

Note

beginDateTime and endDateTime must be used together.

Yes

Writer script demo

{
    "type": "job",
    "version": "2.0",// The version number.
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "datahub",// The plug-in name.
            "parameter": {
                "datasource": "",// The data source.
                "topic": "",// A topic is the smallest unit for subscription and publishing in DataHub. You can use a topic to represent a type or category of streaming data.
                "maxRetryCount": 500,// The maximum number of retries upon task failure.
                "maxCommitSize": 1048576// The data is committed in batches to the destination when the accumulated data buffer reaches the maxCommitSize (in bytes).
                 // DataHub limits a single request to 10,000 records. Exceeding this limit causes a task error. Set this parameter based on the average size per record multiplied by 10,000. For example, if each record is 10 KB, set this parameter to a value lower than 10 × 10,000 KB.
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// The error count.
        },
        "speed": {
            "throttle":true,// Specifies whether to enable throttling. If throttle is set to false, the mbps parameter does not take effect, which means throttling is disabled. If throttle is set to true, throttling is enabled.
            "concurrent":20, // The concurrency.
            "mbps":"12"// The throttling rate. 1 mbps = 1 MB/s.
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer script parameters

Parameter

Description

Required

Default value

accessId

The accessId of DataHub.

Yes

N/A

accessKey

The accessKey of DataHub.

Yes

N/A

endPoint

To access DataHub resources, you must select the correct domain name based on the service to which the resource belongs.

Yes

N/A

maxRetryCount

The maximum number of retries upon task failure.

No

N/A

mode

The write mode when the value is of the STRING type.

Yes

N/A

parseContent

The content to parse.

Yes

N/A

project

A project is the basic organizational unit for DataHub data. A project contains multiple topics.

Note

DataHub projects are independent of MaxCompute projects. Projects that you create in MaxCompute cannot be reused in DataHub. You must create projects separately.

Yes

N/A

topic

A topic is the smallest unit for subscription and publishing in DataHub. You can use a topic to represent a type or category of streaming data.

Yes

N/A

maxCommitSize

To improve write efficiency, DataX accumulates data in a buffer and commits the data in batches to the destination when the accumulated data reaches the maxCommitSize (in bytes). The default value is 1,048,576, which is 1 MB. DataHub limits a single request to 10,000 records. Exceeding this limit causes a task error. Set this parameter based on the average size per record multiplied by 10,000 to control the number of records written to DataHub per request.

No

1MB