The DataHub data source connects DataWorks to DataHub, a real-time data streaming service. It supports batch and real-time synchronization in both directions, enabling you to move data between DataHub and other data stores in your DataWorks pipelines.
Supported synchronization modes
| Mode | Direction | Configuration entry |
|---|---|---|
| Batch synchronization | Read and write | Codeless UI or code editor |
| Real-time synchronization | Read and write | DataStudio |
| Full-database real-time synchronization | Write only | DataStudio |
Supported data types
DataHub supports only the following data types: BIGINT, STRING, BOOLEAN, DOUBLE, TIMESTAMP, and DECIMAL.
Limitations
Batch synchronization
-
The STRING type supports UTF-8 encoding only. Each STRING field can be up to 1 MB.
Real-time synchronization
-
Serverless resource groups are supported.
-
Records with the same hash value are routed to the same shard.
Full-database real-time synchronization
-
Data can only be written to TUPLE-type topics. For more information, see Data types.
-
When the task starts, a batch process writes all existing data to DataHub first. After the initial load completes, real-time incremental synchronization begins.
-
Five additional fields are appended to each source table record. You can also add custom fields when configuring the task. For details on the message format, see Appendix: Message formats.
Prerequisites
Before you begin, ensure that you have:
-
Added the DataHub data source to DataWorks. For instructions, see Data source management
Configure a synchronization task
Choose the guide that matches your synchronization mode:
Batch synchronization
-
Use the codeless UI: Use the codeless UI
-
Use the code editor: Use the code editor
For the full parameter reference and script examples for the code editor, see Appendix: Code and parameters.
Real-time synchronization
Follow the steps in Configure a real-time synchronization task in DataStudio.
Note: For information about supported operations for different DataHub data types, sharding policies, data formats, and sample messages, see Appendix: Message formats.
Full-database real-time synchronization
Follow the steps in Configure a real-time full-database synchronization task.
FAQ
Appendix: Code and parameters
SDK versions
| Component | SDK version |
|---|---|
| DataHub Reader | aliyun-sdk-DataHub 2.9.1 |
| DataHub Writer | aliyun-sdk-datahub 2.5.1 |
Both components use the DataHub Java SDK.
Reader script example
{
"type": "job",
"version": "2.0",
"steps": [
{
"job": {
"content": [
{
"reader": {
"name": "DataHubreader",
"parameter": {
"endpoint": "<Required>",
"accessId": "<Required>",
"accessKey": "<Required>",
"project": "<Required>",
"topic": "<Required>",
"beginDateTime": "<Required - yyyyMMddHHmmss>",
"endDateTime": "<Required - yyyyMMddHHmmss>",
"batchSize": 1000,
"column": [
"col0",
"col1",
"col2",
"col3",
"col4"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Reader parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
endpoint |
string | Yes | — | The DataHub endpoint. |
accessId |
string | Yes | — | The AccessKey ID used to access DataHub. |
accessKey |
string | Yes | — | The AccessKey secret used to access DataHub. |
project |
string | Yes | — | The DataHub project name. A project is the resource management unit in DataHub for isolation and access control. |
topic |
string | Yes | — | The DataHub topic name. |
beginDateTime |
string | Yes | — | The start time for data consumption, in yyyyMMddHHmmss format. The range is inclusive. Must be used together with endDateTime. To read data incrementally, combine with DataWorks scheduling parameters: for example, set the parameter name to bizdate, the value to $[yyyymmdd-1], and beginDateTime to ${bizdate}000000. |
endDateTime |
string | Yes | — | The end time for data consumption, in yyyyMMddHHmmss format. The range is exclusive. Must be used together with beginDateTime. |
batchSize |
integer | No | 1,024 | The number of records to read per batch. |
Writer script example
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",
"parameter": {
"datasource": "<Required>",
"topic": "<Required>",
"maxRetryCount": 500,
"maxCommitSize": 1048576
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle": true,
"concurrent": 20,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
datasource |
string | Yes | — | The name of the DataHub data source configured in DataWorks. |
topic |
string | Yes | — | The DataHub topic name. A topic is the basic unit for data publication and subscription in DataHub. |
project |
string | Yes | — | The DataHub project name. DataHub projects are independent of MaxCompute projects and cannot be shared between the two services. |
accessId |
string | Yes | — | The DataHub AccessKey ID. |
accessKey |
string | Yes | — | The DataHub AccessKey secret. |
endPoint |
string | Yes | — | The DataHub service endpoint. |
mode |
string | Yes | — | The write mode for STRING values. |
parseContent |
boolean | Yes | — | Whether to parse record content before writing. |
maxRetryCount |
integer | No | — | The maximum number of retries on task failure. |
maxCommitSize |
integer | No | 1 MB (1,048,576 bytes) | The buffer size for batched writes. DataHub allows a maximum of 10,000 records per request. Set this value to less than average record size × 10,000 to avoid request size errors. |