The DataHub data source provides a bidirectional channel for reading data from and writing data to DataHub, enabling efficient processing of large-scale data. This topic describes the data synchronization capabilities that DataWorks provides for DataHub.
Supported versions
-
DataHub Reader uses the DataHub SDK for Java to read data from DataHub. The following code shows the SDK version.
<dependency> <groupId>com.aliyun.DataHub</groupId> <artifactId>aliyun-sdk-DataHub</artifactId> <version>2.9.1</version> </dependency> -
DataHub Writer uses the DataHub SDK for Java to write data to DataHub. The following code shows the SDK version.
<dependency> <groupId>com.aliyun.datahub</groupId> <artifactId>aliyun-sdk-datahub</artifactId> <version>2.5.1</version> </dependency>
Limitations
Batch read/write
The STRING data type supports only UTF-8 encoding. A single STRING column cannot exceed 1 MB.
Real-time read/write
-
Real-time synchronization tasks support serverless resource groups.
-
When you synchronize data to DataHub in real time, data is sharded based on the hash value of the source data. Records with the same hash value are synchronized to the same shard.
Real-time full-database write
When you run the task, a batch synchronization task first writes the full data to DataHub. Then, a real-time synchronization task starts, synchronizing incremental data from the source to the destination. Data is written based on the following rules:
-
Data can be written only to DataHub topics of the TUPLE type. For more information about the TUPLE data type, see Data types.
-
When you synchronize data to DataHub in real time, five additional fields are appended to the source table fields. You can also add other fields when you configure the task. For more information about the final message format sent to DataHub, see Appendix: DataHub message formats.
Supported field types
When synchronizing data to DataHub, values are mapped to the corresponding field types. DataHub supports only the BIGINT, STRING, BOOLEAN, DOUBLE, TIMESTAMP, and DECIMAL data types.
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data source management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Configure a single-table batch synchronization task
-
For the procedure, see Configure a batch synchronization task in wizard mode and Configure a batch synchronization task in script mode.
-
For all parameters and script demos for script mode configuration, see Appendix: Script demos and parameter description below.
Configure a single-table real-time synchronization task
For the procedure, see Configure a real-time synchronization task.
For information about operations supported by different DataHub data types, sharding strategies, data formats, and related message examples, see Appendix: DataHub message formats.
Configure a real-time full-database synchronization task
For the procedure, see Configure a real-time full-database synchronization task.
FAQ
Appendix: Script demos and parameter description
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"job": {
"content": [
{
"reader": {
"name": "DataHubreader",
"parameter": {
"endpoint": "xxx" // The endpoint of DataHub.
"accessId": "xxx", // The AccessKey ID used to access DataHub.
"accessKey": "xxx", // The AccessKey secret used to access DataHub.
"project": "xxx", // The name of the DataHub project.
"topic": "xxx" // The name of the DataHub topic.
"batchSize": 1000, // The number of records to read at a time.
"beginDateTime": "20180910111214", // The start time for data consumption.
"endDateTime": "20180910111614", // The end time for data consumption.
"column": [
"col0",
"col1",
"col2",
"col3",
"col4"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
],
"setting":{
"errorLimit":{
"record":"0"// The error count.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. If throttle is set to false, the mbps parameter does not take effect, which means throttling is disabled. If throttle is set to true, throttling is enabled.
"concurrent":1,// The concurrency.
"mbps":"12"// The throttling rate. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader script parameters
|
Parameter |
Description |
Required |
|
endpoint |
The endpoint of DataHub. |
Yes |
|
accessId |
The accessId used to access DataHub. |
Yes |
|
accessKey |
The accessKey used to access DataHub. |
Yes |
|
project |
The name of the DataHub project. A project is the basic resource management unit in DataHub and is used for resource isolation and access control. |
Yes |
|
topic |
The name of the DataHub topic. |
Yes |
|
batchSize |
The number of records to read at a time. Default value: 1,024. |
No |
|
beginDateTime |
The start time for data consumption. This parameter specifies the left boundary (inclusive) of the time range and must be a time string in the yyyyMMddHHmmss format. To implement incremental synchronization, you can use this parameter together with DataWorks scheduling parameters. For example, set the parameter name of the node scheduling parameter to bizdate and the parameter value to $[yyyymmdd-1]. Then, set beginDateTime to ${bizdate}000000, which indicates the start time for data consumption is 00:00:00 of the previous day. Note beginDateTime and endDateTime must be used together. |
Yes |
|
endDateTime |
The end time for data consumption. This parameter specifies the right boundary (exclusive) of the time range and must be a time string in the yyyyMMddHHmmss format. To implement incremental synchronization, you can use this parameter together with DataWorks scheduling parameters. For example, set the parameter name of the node scheduling parameter to bizdate and the parameter value to $[yyyymmdd-1]. Then, set endDateTime to ${bizdate}235959, which indicates the end time for data consumption is 23:59:59 of the previous day. Note beginDateTime and endDateTime must be used together. |
Yes |
Writer script demo
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",// The plug-in name.
"parameter": {
"datasource": "",// The data source.
"topic": "",// A topic is the smallest unit for subscription and publishing in DataHub. You can use a topic to represent a type or category of streaming data.
"maxRetryCount": 500,// The maximum number of retries upon task failure.
"maxCommitSize": 1048576// The data is committed in batches to the destination when the accumulated data buffer reaches the maxCommitSize (in bytes).
// DataHub limits a single request to 10,000 records. Exceeding this limit causes a task error. Set this parameter based on the average size per record multiplied by 10,000. For example, if each record is 10 KB, set this parameter to a value lower than 10 × 10,000 KB.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// The error count.
},
"speed": {
"throttle":true,// Specifies whether to enable throttling. If throttle is set to false, the mbps parameter does not take effect, which means throttling is disabled. If throttle is set to true, throttling is enabled.
"concurrent":20, // The concurrency.
"mbps":"12"// The throttling rate. 1 mbps = 1 MB/s.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer script parameters
|
Parameter |
Description |
Required |
Default value |
|
accessId |
The accessId of DataHub. |
Yes |
N/A |
|
accessKey |
The accessKey of DataHub. |
Yes |
N/A |
|
endPoint |
To access DataHub resources, you must select the correct domain name based on the service to which the resource belongs. |
Yes |
N/A |
|
maxRetryCount |
The maximum number of retries upon task failure. |
No |
N/A |
|
mode |
The write mode when the value is of the STRING type. |
Yes |
N/A |
|
parseContent |
The content to parse. |
Yes |
N/A |
|
project |
A project is the basic organizational unit for DataHub data. A project contains multiple topics. Note DataHub projects are independent of MaxCompute projects. Projects that you create in MaxCompute cannot be reused in DataHub. You must create projects separately. |
Yes |
N/A |
|
topic |
A topic is the smallest unit for subscription and publishing in DataHub. You can use a topic to represent a type or category of streaming data. |
Yes |
N/A |
|
maxCommitSize |
To improve write efficiency, DataX accumulates data in a buffer and commits the data in batches to the destination when the accumulated data reaches the maxCommitSize (in bytes). The default value is 1,048,576, which is 1 MB. DataHub limits a single request to 10,000 records. Exceeding this limit causes a task error. Set this parameter based on the average size per record multiplied by 10,000 to control the number of records written to DataHub per request. |
No |
1MB |