DataHub is a real-time data distribution platform that is designed to process streaming data. You can publish and subscribe to streaming data in DataHub and distribute the data to other platforms. DataHub allows you to analyze streaming data and build applications based on streaming data. This topic describes how to synchronize data from a PolarDB for MySQL cluster to a DataHub instance by using Data Transmission Service (DTS). After you synchronize data to DataHub, you can use big data services such as Realtime Compute for Apache Flink to analyze data in real time.

Prerequisites

  • The DataHub instance resides in the China (Hangzhou), China (Shanghai), China (Beijing), or China (Shenzhen) region.
  • A DataHub project is created to receive the synchronized data. For more information, see Create a project.
  • The binary logging feature is enabled for the PolarDB for MySQL cluster. For more information, see Enable binary logging.
  • The tables to be synchronized from the PolarDB for MySQL cluster have PRIMARY KEY or UNIQUE constraints.

Limits

  • Initial full data synchronization is not supported. DTS does not synchronize historical data of the required objects from the source PolarDB cluster to the destination DataHub instance.
  • You can select only tables as the objects to be synchronized.
  • After a data synchronization task is started, DTS does not synchronize columns that are created in the source PolarDB cluster to the destination DataHub instance.
  • We recommend that you do not perform data definition language (DDL) operations on the required objects during data synchronization. Otherwise, data synchronization may fail.

SQL operations that can be synchronized

INSERT, UPDATE, and DELETE

Procedure

  1. Purchase a data synchronization instance. For more information, see Purchase a DTS instance.
    Note On the buy page, set Source Instance to PolarDB, set Destination Instance to DataHub, and set Synchronization Topology to One-way Synchronization.
  2. Log on to the DTS console.
  3. In the left-side navigation pane, click Data Synchronization.
  4. At the top of the Synchronization Tasks page, select the region where the destination instance resides.
  5. Find the data synchronization instance and click Configure Synchronization Channel in the Actions column.
  6. Configure the source and destination instances.
    Configure the source and destination instances
    Section Parameter Description
    N/A Synchronization Task Name DTS automatically generates a task name. We recommend that you specify an informative name to identify the task. You do not need to use a unique task name.
    Source Instance Details Instance Type The value of this parameter is set to PolarDB Instance and cannot be changed.
    Instance Region The source region that you selected on the buy page. You cannot change the value of this parameter.
    PolarDB Instance ID Select the ID of the source PolarDB cluster.
    Database Account Enter the database account of the source PolarDB cluster.
    Database Password Enter the password of the database account.
    Destination Instance Details Instance Type The value of this parameter is set to DataHub and cannot be changed.
    Instance Region The destination region that you selected on the buy page. You cannot change the value of this parameter.
    Project Select the name of the DataHub project.
  7. In the lower-right corner of the page, click Set Whitelist and Next.
    Note
    • You do not need to modify the security settings for ApsaraDB instances (such as ApsaraDB RDS for MySQL and ApsaraDB for MongoDB) and ECS-hosted databases. DTS automatically adds the CIDR blocks of DTS servers to the whitelists of ApsaraDB instances or the security group rules of Elastic Compute Service (ECS) instances. For more information, see Add the CIDR blocks of DTS servers to the security settings of on-premises databases.
    • After data synchronization is complete, we recommend that you remove the CIDR blocks of DTS servers from the whitelists or security groups.
  8. Select the synchronization policy and the objects to be synchronized.
    Select the objects to be synchronized
    Setting Description
    Initial Synchronization Select Initial Schema Synchronization.
    Note After you select Initial Schema Synchronization, DTS synchronizes the schemas of the required objects (such as tables) to the destination DataHub instance.
    Select the objects to be synchronized

    Select one or more objects from the Available section and click the Rightwards arrow icon to move the objects to the Selected section.

    Note
    • You can select only tables as the objects to be synchronized.
    • By default, after an object is synchronized to the destination instance, the name of the object remains unchanged. You can use the object name mapping feature to rename the objects that are synchronized to the destination instance. For more information, see Rename an object to be synchronized.
    Whether to enable the new naming rules for additional columns After DTS synchronizes data to DataHub, DTS adds additional columns to the destination topic. If the names of additional columns are the same as the names of existing columns in the destination topic, data synchronization fails. Select Yes or No to specify whether you want to enable the new naming rules for additional columns.
    Warning Before you specify this parameter, check whether additional columns and existing columns in the destination topic have name conflicts. For more information, see Modify the naming rules for additional columns.
    Rename Databases and Tables

    You can use the object name mapping feature to rename the objects that are synchronized to the destination instance. For more information, see Object name mapping.

    Replicate Temporary Tables When DMS Performs DDL Operations
    If you use Data Management (DMS) to perform online DDL operations on the source database, you can specify whether to synchronize temporary tables generated by online DDL operations.
    • Yes: DTS synchronizes the data of temporary tables generated by online DDL operations.
      Note If online DDL operations generate a large amount of data, the data synchronization task may be delayed.
    • No: DTS does not synchronize the data of temporary tables generated by online DDL operations. Only the original DDL data of the source database is synchronized.
      Note If you select No, the tables in the destination database may be locked.
    Retry Time for Failed Connections
    By default, if DTS fails to connect to the source or destination database, DTS retries within the next 720 minutes (12 hours). You can specify the retry time based on your needs. If DTS reconnects to the source and destination databases within the specified time, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
    Note When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time based on your business needs. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.
  9. Optional:In the Selected section, move the pointer over the destination topic and click Edit. In the dialog box that appears, set the shard key. The shard key is used for partitioning.
  10. In the lower-right corner of the page, click Precheck.
    Note
    • Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.
    • If the task fails to pass the precheck, you can click the Info icon icon next to each failed item to view details.
      • After you troubleshoot the issues based on the causes, you can run a precheck again.
      • If you do not need to troubleshoot the issues, you can ignore failed items and run a precheck again.
  11. Close the Precheck dialog box after the following message is displayed: The precheck is passed. Then, the data synchronization task starts.
  12. Wait until initial synchronization is completed and the data synchronization task enters the Synchronizing state.
    You can view the state of the data synchronization task on the Synchronization Tasks page. View the status of a data synchronization task

Schema of a DataHub topic

When DTS synchronizes incremental data to a DataHub topic, DTS adds additional columns to store metadata. The following figure shows the schema of a DataHub topic.
Note In this example, id, name, address are data fields. DTS adds the dts_ prefix to data fields because the previous naming rules for additional columns are used.
Schema of a DataHub topic

The following table describes the additional columns in the DataHub topic.

Previous additional column name New additional column name Data type Description
dts_record_id new_dts_sync_dts_record_id String The unique ID of the incremental log entry.
Note
  • By default, the ID auto-increments for each new log entry. In disaster recovery scenarios, rollback may occur, and the ID may not auto-increment. Therefore, some IDs may be duplicated.
  • If an UPDATE operation is performed, DTS generates two incremental log entries to record the pre-update and post-update values. The values of the dts_record_id field in the two incremental log entries are the same.
dts_operation_flag new_dts_sync_dts_operation_flag String The operation type. Valid values:
  • I: an INSERT operation
  • D: a DELETE operation
  • U: an UPDATE operation
dts_instance_id new_dts_sync_dts_instance_id String The server ID of the database.
dts_db_name new_dts_sync_dts_db_name String The name of the database.
dts_table_name new_dts_sync_dts_table_name String The name of the table.
dts_utc_timestamp new_dts_sync_dts_utc_timestamp String The operation timestamp, in UTC. It is also the timestamp of the binary log file.
dts_before_flag new_dts_sync_dts_before_flag String Indicates whether the column values are pre-update values. Valid values: Y and N.
dts_after_flag new_dts_sync_dts_after_flag String Indicates whether the column values are post-update values. Valid values: Y and N.

Additional information about the dts_before_flag and dts_after_flag fields

The values of the dts_before_flag and dts_after_flag fields in an incremental log entry vary with different operation types:

  • INSERT

    For an INSERT operation, the column values are the newly inserted record values (post-update values). The value of the dts_before_flag field is N, and the value of the dts_after_flag field is Y.

    INSERT operation
  • UPDATE

    DTS generates two incremental log entries for an UPDATE operation. The two incremental log entries have the same values for the dts_record_id, dts_operation_flag, and dts_utc_timestamp fields.

    The first log entry records the pre-update values. Therefore, the value of the dts_before_flag field is Y, and the value of the dts_after_flag field is N. The second log entry records the post-update values. Therefore, the value of the dts_before_flag field is N, and the value of the dts_after_flag field is Y.

    UPDATE operation
  • DELETE

    For a DELETE operation, the column values are the deleted record values (pre-update values). The value of the dts_before_flag field is Y, and the value of the dts_after_flag field is N.

    DELETE operation

What to do next

After you configure the data synchronization task, you can use Realtime Compute for Apache Flink to analyze the data that is synchronized to the DataHub instance. For more information, see What is Alibaba Cloud Realtime Compute for Apache Flink?