DataHub is a real-time data distribution platform that is designed to process streaming data. You can publish and subscribe to streaming data in DataHub and distribute the data to other platforms. DataHub allows you to analyze streaming data and build applications based on streaming data. This topic describes how to synchronize data from an Apsara PolarDB for MySQL cluster to a DataHub instance by using Data Transmission Service (DTS). After you synchronize data to DataHub, you can use big data services such as Realtime Compute to analyze data in real time.
Prerequisites
- The DataHub instance resides in the China (Hangzhou), China (Shanghai), China (Beijing), or China (Shenzhen) region.
- A DataHub project is created to receive the synchronized data. For more information, see Create a project.
- The binary logging feature is enabled for the Apsara PolarDB for MySQL cluster. For more information, see Enable binary logging.
- The tables to be synchronized from the Apsara PolarDB for MySQL cluster have PRIMARY KEY or UNIQUE constraints.
Limits
- Initial full data synchronization is not supported. DTS does not synchronize historical data of the required objects from the source PolarDB cluster to the destination DataHub instance.
- You can select only tables as the objects to be synchronized.
- After a data synchronization task is started, DTS does not synchronize columns that are created in the source PolarDB cluster to the destination DataHub instance.
- We recommend that you do not perform DDL operations on the required objects during data synchronization. Otherwise, data synchronization may fail.
SQL operations that can be synchronized
The INSERT, UPDATE, and DELETE operations can be synchronized.
Procedure
Schema of a DataHub topic
id
, name
, address
are data fields. DTS adds the dts_
prefix to data fields because the previous naming rules for additional columns are
used.

The following table describes the additional columns in the DataHub topic.
Previous additional column name | New additional column name | Data type | Description |
---|---|---|---|
dts_record_id |
new_dts_sync_dts_record_id |
String | The ID of the incremental log entry.
Note
|
dts_operation_flag |
new_dts_sync_dts_operation_flag |
String | The operation type. Valid values:
|
dts_instance_id |
new_dts_sync_dts_instance_id |
String | The server ID of the database. The value is set to null . To ensure database security, the actual value is not displayed.
|
dts_db_name |
new_dts_sync_dts_db_name |
String | The name of the database. |
dts_table_name |
new_dts_sync_dts_table_name |
String | The name of the table. |
dts_utc_timestamp |
new_dts_sync_dts_utc_timestamp |
String | The operation timestamp, in UTC. It is also the timestamp of the binary log file. |
dts_before_flag |
new_dts_sync_dts_before_flag |
String | Indicates whether the column values are pre-update values. Valid values: Y and N. |
dts_after_flag |
new_dts_sync_dts_after_flag |
String | Indicates whether the column values are post-update values. Valid values: Y and N. |
Additional information about the dts_before_flag and dts_after_flag fields
The values of the dts_before_flag
and dts_after_flag
fields in an incremental log entry vary with different operation types:
- INSERT
For an INSERT operation, the column values are the newly inserted record values (post-update values). The value of the
dts_before_flag
field is N and the value of thedts_after_flag
field is Y. - UPDATE
DTS generates two incremental log entries for an UPDATE operation. The two incremental log entries have the same values for the
dts_record_id
,dts_operation_flag
, anddts_utc_timestamp
fields.The first log entry records the pre-update values. Therefore, the value of the
dts_before_flag
field is Y and the value of thedts_after_flag
field is N. The second log entry records the post-update values. Therefore, the value of thedts_before_flag
field is N and the value of thedts_after_flag
field is Y. - DELETE
For a DELETE operation, the column values are the deleted record values (pre-update values). The value of the
dts_before_flag
field is Y and the value of thedts_after_flag
field is N.
What to do next
After you configure the data synchronization task, you can use Realtime Compute for Apache Flink to analyze the data that is synchronized to the DataHub instance. For more information, see What is Alibaba Cloud Realtime Compute for Apache Flink?