Data Integration supports real-time synchronization from MySQL, Oracle, PolarDB, and other data sources to Hologres. This topic describes how to synchronize all data from a MySQL database to Kafka in both batch and real-time modes.
Prerequisites
You have purchased a Serverless resource group or an exclusive resource group for Data Integration.
You have created MySQL and Kafka data sources. For more information, see Create a data source for Data Integration.
You have established network connectivity between the resource group and data sources. For more information, see Network connectivity solutions.
Procedure
1. Select a synchronization task type
Go to the Data Integration page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Integration.
In the left-side navigation pane, click Sync Task. Then, click Create Sync Task at the top of the page to go to the sync task creation page. Configure the following basic information:
Data Source And Destination:
MySQL
→Kafka
New Task Name: Customize a name for the synchronization task.
Sync Type:
Database-wide Real-time
.Sync Steps: Select Full Sync and Incremental Sync.
2. Configure network and resources
In the Network And Resources section, select a Resource Group for the synchronization task. You can allocate the number of compute units (CUs) for Task Resource Usage.
Source Data Source Select the added
MySQL
data source, and for Destination Data Source select the addedKafka
data source, then click Test Connectivity.After you confirm that both the source and destination data sources are connected successfully, click Next.
3. Select the database and tables to synchronize
In this step, you can select the tables from the source data source in the Source Database and Tables section and click the icon to move them to the Selected Database and Tables section on the right.
4. Configure settings related to the destination
After you select the tables from which you want to synchronize data, the selected tables are automatically displayed in the Mapping Rules for Destination Tables section. The properties of the destination tables are waiting to be mapped. You must manually define mappings between the source tables and destination tables to determine the data reading and writing relationships. Then, you can click Refresh in the Actions column. You can directly refresh mappings between source tables and destination tables. You can also refresh mappings between source tables and destination tables after you configure settings related to destination tables.
You can select the tables to synchronize and click Batch Refresh Mapping. If no mapping rule is configured, the default table name rule is
${source_database_name}_${table_name}
. If a table with the same name does not exist in the destination, a new table is automatically created.You can click the Configure button in the Custom Destination Topic Name Mapping column to customize the destination topic name rule.
You can use built-in variables and manually entered strings to form the final Destination Topic Name. You can edit the built-in variables. For example, you can create a topic name rule to add a suffix to the source table name as the Destination Topic Name.
You can click the Configure button in the Write Key Value column to set the write key.
a. Modify data type mappings for fields
Default mappings exist between data types of source fields and data types of destination fields. You can click Edit Mapping of Field Data Types in the upper-right corner of the Mapping Rules for Destination Tables section to configure data type mappings between source fields and destination fields based on your business requirements. After the configuration is complete, click Apply and Refresh Mapping.
2. Add fields to the destination topic and assign values
When the destination topic is in the To Be Created state, you can add fields to the destination topic based on the original table schema. To add fields to the table and assign values to the fields:
Add fields to a single table and assign values: Click the Configure button in the Destination Topic Field Assignment column. On the Additional Fields page, click Add Field to add fields to the destination topic and assign values to the fields.
Batch assignment: Select multiple tables and choose
at the bottom of the list to assign values to the same fields in the destination tables in batches.NoteYou can assign constants and variables. You can click the
icon to switch the assignment mode.
c. Configure DML processing rules
Data Integration provides default DML processing rules. You can also configure DML processing rules for destination tables based on your business requirements.
Configure DML processing rules for a single destination table: Find the destination table for which you want to configure DML processing rules and click Configure in the Configure DML Rule column to configure DML processing rules for the table.
Configure DML processing rules for multiple destination tables at a time: Select the destination tables for which you want to configure DML processing rules, click Batch Modify in the lower part of the page, and then click Configure DML Rule.
4. Set destination topic properties
You can click in the Destination Topic Name column to set topic properties. You can set the Number Of Partitions and Number Of Replicas for the topic.
5. Set the source split column
You can select a field from the source table in the Source Split Column drop-down list or select No Split.
6. Specify whether to perform full synchronization
If you selected Full Sync for Sync Steps when you selected a synchronization task type, you can disable full synchronization for specific tables here.
5. Configure alert rules
To prevent the failure of the synchronization task from causing latency on business data synchronization, you can configure different alert rules for the synchronization task.
In the upper-right corner of the page, click Configure Alert Rule to go to the Configure Alert Rule panel.
In the Configure Alert Rule panel, click Add Alert Rule. In the Add Alert Rule dialog box, configure the parameters to configure an alert rule.
NoteThe alert rules that you configure in this step take effect for the real-time synchronization subtask that will be generated by the synchronization task. After the configuration of the synchronization task is complete, you can refer to Manage real-time synchronization tasks to go to the Real-time Synchronization Task page and modify alert rules configured for the real-time synchronization subtask.
Manage alert rules.
You can enable or disable alert rules that are created. You can also specify different alert recipients based on the severity levels of alerts.
6. Configure advanced parameters
You can change the values of specific parameters configured for the synchronization task based on your business requirements. For example, you can specify an appropriate value for the Maximum read connections parameter to prevent the current synchronization task from imposing excessive pressure on the source database and data production from being affected.
To prevent unexpected errors or data quality issues, we recommend that you understand the meanings of the parameters before you change the values of the parameters.
In the upper-right corner of the configuration page, click Configure Advanced Parameters.
In the Configure Advanced Parameters panel, change the values of the desired parameters.
7. Configure DDL processing rules
DDL operations may be performed on the source. You can click Configure DDL Capability in the upper-right corner of the page to configure rules to process DDL messages from the source based on your business requirements.
For more information, see Configure rules to process DDL messages.
8. View and change resource groups
You can click Configure Resource Group in the upper-right corner of the page to view and change the resource groups that are used to run the current synchronization task.
9. Run the synchronization task
After the configuration of the synchronization task is complete, click Complete in the lower part of the page.
In the Nodes section of the Data Integration page, find the created synchronization task and click Start in the Actions column.
Click the name or ID of the synchronization task in the Tasks section and view the detailed running process of the synchronization task.
Perform O&M operations on the synchronization task
View the status of the synchronization task
After you create a synchronization task, you can view the list of created synchronization tasks and the basic information of each synchronization task on the Sync Task page.
You can Start or Stop a synchronization task in the Operation column. You can also Edit or View a synchronization task by choosing More.
For a started task, you can view the basic running status in Execution Overview. You can also click the corresponding overview area to view the execution details.
A real-time synchronization task from MySQL to Kafka consists of three steps:
Schema Migration: This tab displays information such as whether the destination table is a newly created table or an existing table. For a newly created table, the DDL statement that is used to create the table is displayed.
Full Data Initialization: This tab displays information such as the source tables and destination tables involved in batch synchronization, the synchronization progress, and the number of data records that are synchronized.
Real-time Synchronization: This tab displays statistical information about real-time synchronization, including the synchronization progress, DDL records, DML records, and alert information.
Rerun the synchronization task
In some special cases, if you add tables to or remove tables from the source, or change the schema or name of a destination table, you can click More in the Actions column of the synchronization task and then click Rerun to rerun the task after the change. During the rerun process, the synchronization task synchronizes data only from the newly added tables to the destination or only from the mapped source table to the destination table whose schema or name is changed.
If you want to rerun the synchronization task without modifying the configuration of the task, click More in the Actions column and then click Rerun to rerun the task to perform full synchronization and incremental synchronization again.
If you want to rerun the synchronization task after you add tables to or remove tables from the task, click Complete after the change. In this case, Apply Updates is displayed in the Actions column of the synchronization task. Click Apply Updates to trigger the system to rerun the synchronization task. During the rerun process, the synchronization task synchronizes data from the newly added tables to the destination. Data in the original tables is not synchronized again.
Appendix: Configure the formats of messages written to Kafka
After you configure a real-time synchronization task, you can run the task to read all the existing data in the source and write the data to the destination Kafka topics in the JSON format. The solution also generates a real-time synchronization node to write incremental data to Kafka in real time. In addition, incremental DDL-based data changes in the source are also written to Kafka in the JSON format in real time. You can obtain the status and change information of messages written to Kafka by referring to Appendix: Message Format.
In the JSON structure of data written to Kafka through offline synchronization tasks, the payload.sequenceId, payload.timestamp.eventTime, and payload.timestamp.checkpointTime fields are all set to -1.