DataHub is a real-time data distribution platform that is designed to process streaming data. You can publish and subscribe to streaming data in DataHub and distribute the data to other platforms. DataHub allows you to analyze streaming data and build applications based on streaming data. This topic describes how to synchronize data from an ApsaraDB RDS for MySQL instance to a DataHub instance by using Data Transmission Service (DTS). After you synchronize data, you can use big data services such as Realtime Compute for Apache Flink to analyze the data in real time.

Prerequisites

Precautions

Category Description
Limits on the source database
  • The tables to synchronize must have PRIMARY KEY or UNIQUE constraints, and all fields must be unique. Otherwise, the destination database may contain duplicate data records.
  • If you select tables as the objects to synchronize and you want to edit tables (such as renaming tables or columns) in the destination database, up to 1,000 tables can be synchronized in a single data synchronization task. If you run a task to synchronize more than 1,000 tables, a request error occurs. In this case, we recommend that you split the tables and configure multiple tasks to synchronize the tables, or configure a task to synchronize the entire database.
  • The following requirements for binary logs must be met:
    • The binary logging feature is enabled. For more information about how to enable binary logging, see Modify the parameters of an ApsaraDB RDS for MySQL instance. In addition, the binlog_row_image parameter must be set to full. Otherwise, error messages are returned during precheck and the data synchronization task cannot be started.
      Notice
      • If the source database is a self-managed MySQL database, you must enable binary logging and set binlog_format to row and binlog_row_image to full.
      • If the self-managed MySQL database is deployed in a dual-primary cluster, you must set log_slave_updates to ON. This ensures that DTS can obtain all binary logs. For more information, see Create an account for a user-created MySQL database and configure binary logging.
    • For an incremental data synchronization task, the binary logs of the source database are retained for at least 24 hours. For a schema and incremental data synchronization task, the binary logs of the source database are retained for at least seven days. After the schema is synchronized, you can set the retention period to more than 24 hours. Otherwise, DTS may fail to obtain the binary logs and the task may fail. In exceptional circumstances, data inconsistency or loss may occur. Make sure that you set the retention period of binary logs in accordance with the preceding requirements. Otherwise, the SLA of DTS does not ensure service reliability and performance. For more information about binary log files and log backup files of an ApsaraDB RDS for MySQL instance, see View and delete the binary log files of an ApsaraDB RDS for MySQL instance.

Other limits
  • Initial full data synchronization is not supported. DTS does not synchronize the historical data of required objects from the source ApsaraDB RDS for MySQL instance to the destination DataHub instance.
  • During data synchronization, we recommend that you use only DTS to write data to the destination database. This prevents data inconsistency between the source and destination databases. If you use tools other than DTS to write data to the destination database, data loss may occur in the destination database when you use DMS to perform online DDL operations.
Special cases
If the source database is a self-managed MySQL database, take note of the following items:
  • If you perform a primary/secondary switchover on the source database when the data synchronization task is running, the task fails.
  • DTS calculates synchronization latency based on the timestamp of the latest synchronized data in the destination database and the current timestamp in the source database. If no DML operation is performed on the source database for a long time, the synchronization latency may be inaccurate. If the synchronization latency is too high, you can perform a DML operation on the source database to update the latency.
    Note If you select an entire database as the object to synchronize, you can create a heartbeat table. The heartbeat table is updated or receives data every second.

Supported synchronization topologies

  • One-way one-to-one synchronization
  • One-way one-to-many synchronization
  • One-way many-to-one synchronization
For more information, see Synchronization topologies.

SQL operations that can be synchronized

Operation type SQL statements
DML INSERT, UPDATE, and DELETE
Notice To synchronize a new column in the source table by using DTS, you must first add a column to the destination table. Then, add the new column to the source table.DataHub

Procedure

Note This procedure is described based on the new version of the DTS console. In the event of discrepancies in operations between the console and the DTS section in the Data Management Service (DMS) console, the DMS console takes precedence.
  1. Go to the Data Synchronization page of the new DTS console.
    Note You can also log on to the Data Management console. In the top navigation bar, click DTS. Then, in the left-side navigation pane, choose DTS (DTS) > Data Synchronization.
  2. In the upper-left corner of the page, select the region where the data synchronization instance resides.
    Region
  3. Click Create Task. On the page that appears, configure the source and destination databases.
    Warning After you select the source and destination instances, we recommend that you read the Limits displayed in the upper part of the page. This ensures that you create and run the data synchronization task.
    Section Parameter Description
    None Task Name

    The task name that DTS automatically generates. We recommend that you specify a descriptive name that makes it easy to identify the task. You do not need to use a unique task name.

    Source Database Select Instance
    Specifies whether to use an existing instance based on your needs.
    • If you use an existing instance, DTS automatically applies the parameter settings of the instance.
    • If you do not use an existing instance, you must configure the following parameters.
    Database Type Select MySQL.
    Access Method Select Alibaba Cloud Instance.
    Instance Region Select the region in which the source ApsaraDB RDS for MySQL instance resides.
    Replicate Data Across Alibaba Cloud Accounts In this example, select No because data is replicated within the same Alibaba Cloud account.
    RDS Instance ID Select the ID of the source ApsaraDB RDS for MySQL instance.
    Database Account Enter the database account of the source ApsaraDB RDS for MySQL instance. The account must have the read permissions on the objects to be synchronized.
    Database Password

    The password of the database account.

    Encryption

    Select Non-encrypted or SSL-encrypted based on your requirements. If you want to select SSL-encrypted, you must enable SSL encryption for the source instance before you configure the data migration task. For more information, see Configure SSL encryption for an ApsaraDB RDS for MySQL instance.

    Destination Database Select Instance
    Specifies whether to use an existing instance based on your needs.
    • If you use an existing instance, DTS automatically applies the parameter settings of the instance.
    • If you do not use an existing instance, you must configure the following parameters.
    Database Type Select DataHub.
    Access Method Select Alibaba Cloud Instance.
    Instance Region Select the region in which the DataHub instance resides.
    Project Select the destination DataHub instance for Project.
  4. In the lower part of the page, click Test Connectivity and Proceed.
    Note
    • You do not need to modify the security settings for ApsaraDB instances (such as ApsaraDB RDS for MySQL and ApsaraDB for MongoDB) and ECS-hosted databases. DTS automatically adds the CIDR blocks of DTS servers to the whitelists of ApsaraDB instances or the security group rules of ECS instances. For more information, see Add the CIDR blocks of DTS servers to the security settings of on-premises databases.
    • After data synchronization is complete, we recommend that you remove the CIDR blocks of DTS servers from the whitelists or security groups.
  5. Select the objects to migrate.
    • Basic Settings
      Parameter Description
      Task Stages
      By default, Incremental Data Synchronization is selected. You can select only Schema Synchronization. Full Data Synchronization cannot be selected.
      Note During schema synchronization, DTS synchronizes the schemas of selected objects such as tables from the source database to the destination DataHub instance.
      Processing Mode of Conflicting Tables
      • Precheck and Report Errors: checks whether the destination database contains tables that have the same names as tables in the source database. If the source and destination databases do not contain identical table names, the precheck is passed. Otherwise, an error is returned during the precheck and the data synchronization task cannot be started.

        Note You can use the object name mapping feature to rename the tables that are migrated to the destination database. You can use this feature if the source and destination databases contain identical table names and the tables in the destination database cannot be deleted or renamed. For more information, see Map object names.
      • Ignore Errors and Proceed: skips the precheck for identical table names in the source and destination databases.
        Warning If you select Ignore Errors and Proceed, data inconsistency may occur and your business may be exposed to potential risks.
        • If the source and destination databases have the same schema, and a data record has the same primary key as an existing data record in the destination database:
          • During full data synchronization, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.
          • During incremental data synchronization, DTS synchronizes the data record to the destination database. The existing data record in the destination database is overwritten.
        • If the source and destination databases have different schemas, initial data synchronization may fail. In this case, only part of the columns are synchronized, or the data synchronization task fails.
      Apply New Naming Rules of Additional Columns
      During data synchronization to a DataHub instance, DTS adds additional columns to the destination topic. If the names of additional columns are the same as those of existing columns in the destination topic, the data synchronization fails. Select Yes or No to specify whether you want to enable the new naming rules for additional columns.
      Warning Before you specify this parameter, check whether additional columns and existing columns in the destination topic have name conflicts. For more information, see Modify the naming rules for additional columns.
      Select Objects

      Select one or more objects from the Source Objects section and click the Rightwards arrow icon to add the objects to the Selected Objects section.

      Note You can select only tables as the objects to synchronize.
      Rename Databases and Tables
      • To rename an object in the destination instance, right-click the object in the Selected Objects section. For more information, see Map the name of a single object.
      • To rename multiple objects at a time in the destination instance, click Batch Edit in the upper-right corner of the Selected Objects section. For more information, see Map multiple object names at a time.
      Filter data

      You can specify WHERE conditions to filter data. For more information, see Use SQL conditions to filter data.

      Select the SQL operations to be synchronized In the Selected Objects section, right-click an object. In the dialog box that appears, select the DML operations that you want to synchronize. For more information, see SQL operations that can be synchronized.
    • Advanced Settings
      Parameter Description
      Set Alerts
      Specifies whether to set alerts for the data synchronization task. If you select yes, DTS sends notifications to contacts if the task fails or the synchronization latency exceeds the upper limit.
      • No: does not set alerts.
      • Yes: sets alerts. In this case, you must also set the alert threshold and alert contacts.
      Capitalization of Object Names in Destination Instance

      Specifies the capitalization of database names, table names, and column names in the destination instance. By default, DTS default policy is selected. You can select other options to make sure that the capitalization of object names is consistent with that of the source or destination database. For more information,see Specify the capitalization of object names in the destination instance.

      Retry Time for Failed Connection
      Specifies the retry time range for failed connections. Valid values: 10 to 1440. Unit: minutes. Default value: 120. We recommend that you set the retry time range to more than 30 minutes. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
      Note
      • If an instance serves as the source or destination database of multiple data synchronization tasks, the less value that is specified for the instance takes precedence.
      • When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business needs. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.
  6. Optional: In the Selected Objects section, move the pointer over the destination topic and right-click the object. In the Edit dialog box that appears, set the shard key. The shard key is used for partitioning.
  7. Click Next: Save Task Settings and Precheck in the lower part of the page.
    Note
    • Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.
    • If the task fails to pass the precheck, you can click the Info icon icon next to each failed item to view details.
      • After you troubleshoot the issues based on the causes, you can run a precheck again.
      • If you do not need to troubleshoot the issues, you can ignore failed items and run a precheck again.
  8. Wait until the Success Rate becomes 100%. Then, click Next: Purchase Instance.
  9. On the Purchase Instance page, specify the billing method and specifications for the data synchronization instance. The following table describes related parameters.
    Section Parameter Description
    Parameters Billing method
    • Subscription: You pay for your subscription when you create an instance. The subscription billing method is more cost-effective than the pay-as-you-go billing method for long-term use.
    • Pay-as-you-go: A pay-as-you-go instance is billed on an hourly basis. For short-term use, we recommend that you select the pay-as-you-go billing method. If you no longer need a pay-as-you-go instance, you can release the instance to reduce costs.
    Instance Class DTS provides several instance classes that have different performance in synchronization speed. You can select an instance class based on your business scenario. For more information, see Specifications of data synchronization instances.
    Subscription Duration If you select the subscription billing method, set the subscription duration and the number of instances that you want to create. The subscription duration can be one to nine months or one to three years.
    Note This parameter is available only if you select the subscription billing method.
  10. Read and select Data Transmission Service (Pay-as-you-go) Service Terms.
  11. Click Buy and Start to start the data synchronization task. You can view the progress of the task in the task list.

Schema of a DataHub topic

When DTS synchronizes incremental data to a DataHub topic, DTS adds additional columns to store metadata in the topic. The following figure shows the schema of a DataHub topic.
Note In this example, id, name, address are data fields. DTS adds the dts_ prefix to data fields because the previous naming rules for additional columns are used.
Define topics

The following table describes the additional columns in the DataHub topic.

Previous additional column name New additional column name Data type Description
dts_record_id new_dts_sync_dts_record_id String The unique ID of the incremental log entry.
Note
  • By default, the ID auto-increments for each new log entry. In disaster recovery scenarios, rollback may occur, and the ID may not auto-increment. Therefore, some IDs may be duplicated.
  • If an UPDATE operation is performed, DTS generates two incremental log entries to record the pre-update and post-update values. The values of the dts_record_id field in the two incremental log entries are the same.
dts_operation_flag new_dts_sync_dts_operation_flag String The operation type. Valid values:
  • I: an INSERT operation
  • D: a DELETE operation
  • U: an UPDATE operation
dts_instance_id new_dts_sync_dts_instance_id String The server ID of the database.
dts_db_name new_dts_sync_dts_db_name String The name of the database.
dts_table_name new_dts_sync_dts_table_name String The name of the table.
dts_utc_timestamp new_dts_sync_dts_utc_timestamp String The operation timestamp, in UTC. It is also the timestamp of the binary log file.
dts_before_flag new_dts_sync_dts_before_flag String Indicates whether the column values are pre-update values. Valid values: Y and N.
dts_after_flag new_dts_sync_dts_after_flag String Indicates whether the column values are post-update values. Valid values: Y and N.

Additional information about the dts_before_flag and dts_after_flag fields

The values of the dts_before_flag and dts_after_flag fields in an incremental log entry vary with different operation types:

  • INSERT

    For an INSERT operation, the column values are the newly inserted record values (post-update values). The value of the dts_before_flag field is N, and the value of the dts_after_flag field is Y.

    INSERT operation
  • UPDATE

    DTS generates two incremental log entries for an UPDATE operation. The two incremental log entries have the same values for the dts_record_id, dts_operation_flag, and dts_utc_timestamp fields.

    The first log entry records the pre-update values. Therefore, the value of the dts_before_flag field is Y, and the value of the dts_after_flag field is N. The second log entry records the post-update values. Therefore, the value of the dts_before_flag field is N, and the value of the dts_after_flag field is Y.

    UPDATE operation
  • DELETE

    For a DELETE operation, the column values are the deleted record values (pre-update values). The value of the dts_before_flag field is Y, and the value of the dts_after_flag field is N.

    DELETE operation