All Products
Search
Document Center

DataWorks:DataHub data source

Last Updated:Jan 15, 2024

DataWorks provides DataHub Reader and DataHub Writer for you to read data from and write data to DataHub data sources. This facilitates fast computing for large amounts of data. This topic describes the capabilities of synchronizing data of DataHub data sources.

Supported DataHub versions

  • DataHub Reader reads data from DataHub by using DataHub SDK for Java. The following code provides an example of DataHub SDK for Java:

    <dependency>
        <groupId>com.aliyun.DataHub</groupId>
        <artifactId>aliyun-sdk-DataHub</artifactId>
        <version>2.9.1</version>
    </dependency>
  • DataHub Writer writes data to DataHub by using DataHub SDK for Java. The following code provides an example of DataHub SDK for Java:

    <dependency>
        <groupId>com.aliyun.datahub</groupId>
        <artifactId>aliyun-sdk-datahub</artifactId>
        <version>2.5.1</version>
    </dependency>

Limits

Batch data read and write

Strings must be encoded in the UTF-8 format. The size of each string must not exceed 1 MB.

Real-time data read and write

  • Real-time synchronization tasks support only exclusive resource groups for Data Integration.

  • When you synchronize data to a DataHub data source in real time, the hash value of source data is verified. Data with the same hash value is synchronized to the same shard in the DataHub data source.

Real-time write of full and incremental data

After you run a synchronization solution, full data in the source is written to the destination by using batch synchronization tasks. Then, incremental data in the source is written to the destination by using real-time synchronization tasks. When you write data to DataHub, take note of the following points:

  • You can write data only to topics of the TUPLE type. For more information about the data types that are supported by a TUPLE topic, see Data types.

  • When you run a real-time synchronization task to synchronize data to DataHub in real time, five additional fields are added to the destination topic by default. You can also add other fields to the destination topic based on your business requirements. For more information about the DataHub message formats, see Appendix: DataHub message formats.

Data type mappings

Data is synchronized based on the mappings between the data types of fields in DataHub and those in a specified service. DataHub supports only the following data types: BIGINT, STRING, BOOLEAN, DOUBLE, TIMESTAMP, and DECIMAL.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.

Add a data source

Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.

Configure a batch synchronization task to synchronize data of a single table

Configure a real-time synchronization task to synchronize data of a single table or synchronize all data of a database

For more information about the configuration procedure, see Configure a real-time synchronization task in DataStudio.

Note

For information about support of different topic types for synchronization of data changes generated by operations on a source table, sharding strategies for different topic types, data formats, and sample messages, see Appendix: DataHub message formats.

Configure synchronization settings to implement (real-time) synchronization of full and incremental data in a single table or a database

For more information about the configuration procedure, see Configure a synchronization task in Data Integration.

FAQ

What do I do if data fails to be written to DataHub because the amount of data that I want to write to DataHub at a time exceeds the upper limit?

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Code for DataHub Reader

{
    "type":"job",
    "version":"2.0",// The version number.
    "steps":[
        {
         "job": {
           "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx" // The endpoint of DataHub. 
                        "accessId": "xxx", // The AccessKey ID that is used to connect to DataHub. 
                        "accessKey": "xxx", // The AccessKey secret that is used to connect to DataHub. 
                        "project": "xxx", // The name of the DataHub project from which you want to read data. 
                        "topic": "xxx" // The name of the DataHub topic from which you want to read data. 
                        "batchSize": 1000, // The number of data records to read at a time. 
                        "beginDateTime": "20180910111214", // The start time of data consumption. 
                        "endDateTime": "20180910111614", // The end time of data consumption. 
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                                  ]
                                }
                           },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                                 }
                            }
             }
           ]
         }
       }
     ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed.
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1,// The maximum number of parallel threads.
            "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Parameters in code for DataHub Reader

Parameter

Description

Required

endpoint

The endpoint of DataHub.

Yes

accessId

The AccessKey ID that is used to connect to DataHub.

Yes

accessKey

The AccessKey secret that is used to connect to DataHub.

Yes

project

The name of the DataHub project from which you want to read data. DataHub projects are the resource management units in DataHub for resource isolation and control.

Yes

topic

The name of the DataHub topic from which you want to read data.

Yes

batchSize

The number of data records to read at a time. Default value: 1024.

No

beginDateTime

The start time of data consumption. This parameter specifies the left boundary of a left-closed, right-open interval. Specify the start time in the format of yyyyMMddHHmmss. The parameter can be used together with the scheduling time parameters in DataWorks.

Note

The beginDateTime and endDateTime parameters must be used in pairs.

Yes

endDateTime

The end time of data consumption. This parameter specifies the right boundary of a left-closed, right-open interval. Specify the end time in the format of yyyyMMddHHmmss. The parameter can be used together with the scheduling time parameters in DataWorks.

Note

The beginDateTime and endDateTime parameters must be used in pairs.

Yes

Code for DataHub Writer

{
    "type": "job",
    "version": "2.0",// The version number. 
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "datahub",// The plug-in name. 
            "parameter": {
                "datasource": "",// The name of the data source. 
                "topic": "",// The minimum unit for data subscription and publishing. You can use topics to distinguish different types of streaming data. 
                "maxRetryCount": 500,// The maximum number of retries if the synchronization task fails. 
                "maxCommitSize": 1048576// The maximum amount of the buffered data that Data Integration can accumulate before it commits the data to the destination. Unit: bytes. 
                 // DataHub allows for a maximum of 10,000 data records to be written in a single request. If the number of data records exceeds 10,000, the synchronization task fails. In this case, the maximum amount of data that can be written in a single request is calculated by using the following formula: Average amount of data in a single data record × 10,000. You need to set maxCommitSize to a value less than the maximum amount of data calculated. This ensures that the number of data records to be written in a single request does not exceed 10,000. For example, if the data size of a single data record is 10 KB, the value of this parameter must be less than the result of 10 multiplied by 10,000. 
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// The maximum number of dirty data records allowed. 
        },
        "speed": {
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":20, // The maximum number of parallel threads. 
            "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Parameters in code for DataHub Writer

Parameter

Description

Required

Default value

accessId

The AccessKey ID that is used to connect to DataHub.

Yes

No default value

accessKey

The AccessKey secret that is used to connect to DataHub.

Yes

No default value

endPoint

The endpoint of DataHub.

Yes

No default value

maxRetryCount

The maximum number of retries if the synchronization task fails.

No

No default value

mode

The mode for writing strings.

Yes

No default value

parseContent

The data to be parsed.

Yes

No default value

project

The basic organizational unit of data in DataHub. Each project has one or more topics.

Note

DataHub projects are independent of MaxCompute projects. You cannot use MaxCompute projects as DataHub projects.

Yes

No default value

topic

The minimum unit for data subscription and publishing. You can use topics to distinguish different types of streaming data.

Yes

No default value

maxCommitSize

The maximum amount of the buffered data that Data Integration can accumulate before it commits the data to the destination. You can specify this parameter to improve writing efficiency. The default value is 1048576, in bytes, which is 1 MB. DataHub allows for a maximum of 10,000 data records to be written in a single request. If the number of data records exceeds 10,000, the synchronization task fails. In this case, the maximum amount of data that can be written in a single request is calculated by using the following formula: Average amount of data in a single data record × 10,000. You need to set maxCommitSize to a value less than the maximum amount of data calculated. This ensures that the number of data records to be written in a single request does not exceed 10,000.

No

1MB