You can create a real-time synchronization solution and use the solution to synchronize full and incremental data to Kafka. This topic describes how to create a real-time synchronization solution to synchronize data to Kafka.
Prerequisites
The required data sources are configured. Before you configure a synchronization task, you must configure the data sources from which you want to read data and to which you want to write data. This way, you can select the data sources when you configure a synchronization task. For information about the supported data source types and the configuration of a data source, see Supported data source types and read and write operations.
NoteFor information about the items that you need to understand before you configure a data source, see Overview.
The data source environments are prepared. Before you configure a synchronization task, you must create an account that can be used to access a database and grant the account the permissions required to perform specific operations on the database based on your configurations for data synchronization. For more information, see Overview.
Background information
Item | Description |
Number of tables from which you can read data |
|
Nodes | A real-time synchronization solution generates batch synchronization nodes to synchronize full data and real-time synchronization nodes to synchronize incremental data. The number of batch synchronization nodes that are generated by the solution varies based on the number of tables from which you can read data. |
Data write | After you run a real-time synchronization solution, full data in the source is written to the destination by using batch synchronization nodes. Then, incremental data in the source is written to the destination by using real-time synchronization nodes.
|
Precautions
To run synchronization nodes on an exclusive resource group for Data Integration, make sure that the version of the DataX plug-in that is used to run batch synchronization nodes is 20210726203000 or later and the version of the StreamX plug-in that is used to run real-time synchronization nodes is 202107121400 or later. Otherwise, a data format error may be reported or synchronization nodes for synchronizing incremental data or full data to Kafka fail to be run.
View the version of DataX: Log on to the DataWorks console and go to the Operation Center page. In the left-side navigation pane of the Operation Center page, choose DataX (20210709_keyindex-20210709144909), From Alibaba !"
in the log area to view the version information of DataX
, as shown in the third figure in this section.
View the version of StreamX: Log on to the DataWorks console and go to the Operation Center page. In the left-side navigation pane of the Operation Center page, choose StreamX( ..... ),From Alibaba!
in the log area. For example, you can search for StreamX (202107290000_20210729121213), From Alibaba !
in the log area to view the version information of StreamX, as shown in the following figure.
Procedure
Step 1: Select a synchronization solution
Go to the Data Integration page in the DataWorks console and click Create Data Synchronization Solution. On the Create Data Synchronization Solution page, select a source and a destination for data synchronization from the drop-down lists. Then, select One-click real-time synchronization to Kafka.
Step 2: Configure network connections for data synchronization
Select a source, a destination, and a resource group that is used to run nodes. Test the network connectivity to make sure that the resource group is connected to the source and destination. For more information, see Configure network connections for data synchronization.
Step 3: Configure the source and synchronization rules
- In the Basic Configuration section, configure the parameters, such as the Solution Name and Location parameters, based on your business requirements.
- In the Data Source section, confirm the information about the source.
- In the Source Table section, select the tables from which you want to read data from the Source Table list. Then, click the icon to add the tables to the Selected Source Table list.
The Selected Source Table list displays all tables in the source. You can select all or specific tables.
In the Mapping Rules for Table Names section, click Add Rule, select a rule type, and then configure a mapping rule of the selected type.
By default, data in a source table is written to a Kafka topic that has the same name as the source table. You can specify a destination topic name in a mapping rule to write data in multiple source tables to the same Kafka topic. You can also specify prefixes in a mapping rule to write data in source tables with a specific prefix to Kafka topics with the same names as the source tables but a different prefix. You can use regular expressions to convert the names of the Kafka topics. You can also use built-in variables to add prefixes and suffixes to the names of destination topics.
Step 4: Configure a destination topic
Specify the basic information.
Parameter
Description
Source tables without primary keys can be synchronized
Specifies whether a source table that does not have a primary key can be synchronized to the destination Kafka topic. If you select Source tables without primary keys can be synchronized in the Set Destination Topic step, a source table that does not have a primary key can be synchronized to the destination. However, empty strings are used as the keys in Kafka records during data synchronization. To ensure that changes of data with the same primary key value in the source table can be written to Kafka in an orderly manner, you must make sure that the Kafka topic to which the data changes are written contains only one partition.
Send heartbeat record
Specifies whether to send alert information to Kafka. If you select Send heartbeat record, the real-time synchronization node generated by the synchronization solution writes a record that contains the current timestamp to Kafka every 5 seconds. This way, you can view the updates of the timestamp for the latest record written to Kafka and check the progress of the data synchronization even if no new records are written to Kafka.
When one record in the source is updated, one Kafka record is generated
Specifies whether to store the data before and after an update in one Kafka record.
If you select this checkbox and one record in the source relational database is updated, the data before and after the update is stored in one Kafka record.
If you do not select this checkbox and one record in the source relational database is updated, the data before and after the update is separately stored in two Kafka records.
Configure mappings between the source table and destination Kafka topic.
Click Refresh source table and Kafka Topic mapping to create a destination topic based on the rules you configured in the Mapping Rules for Table Names section in Step 3. If no mapping rule is configured in Step 3, data in the source table is written to the destination topic that has the same name as the source table. If no destination topic that has the same name as the source table exists, the system automatically creates such a destination topic. You can also change the method of creating the destination topic and add additional fields to the destination topic.
NoteThe name of the destination topic is generated based on the mapping rules that you configured in the Mapping Rules for Table Names section.
Operation
Description
Select a primary key for a source table
If a table in the source database has a primary key, the values of the primary key are used as the keys in Kafka records during data synchronization. This ensures that changes of data with the same primary key value in the table are written to the same partition in Kafka in an orderly manner.
If a table in the source database does not have a primary key, the following situations occur:
If you select Source tables without primary keys can be synchronized. in the Set Destination Topic step, source tables without primary keys can be synchronized. In this case, empty strings are used as the keys in Kafka records during data synchronization. To ensure that changes of data with the same primary key value in a source table are written to Kafka in an orderly manner, you must make sure that the Kafka topic to which the data changes are written contains only one partition. You can also click the icon to customize a primary key for a source table that does not have a primary key. You can use a field or a combination of several fields in the source table as the primary key. The values of the primary key are used as the keys in Kafka records during data synchronization.
If you do not select Source tables without primary keys can be synchronized. in the Set Destination Topic step, errors occur when you synchronize source tables without primary keys. In this case, you must delete these tables or click the icon to customize primary keys for the tables before the data synchronization can continue.
Select the method of creating a destination topic
You can set the Topic creation method to Create Topic or Use Existing Topic.
Use Existing Topic: If you select this method, you must select the desired destination topic from the drop-down list in the Kafka Topic column.
Create Topic: If you select this method, the name of the Kafka topic that is automatically created appears in the Kafka Topic column. You can click the automatically created topic to view and modify the name and description of the topic.
Edit additional fields
You can click Edit additional fields in the Actions column to add additional fields to a destination topic and assign values to the fields. The values can be constants or variables.
NoteYou can add additional fields only if you select Create Topic from the drop-down list in the Topic creation method column.
Edit destination topics
By default, the synchronization solution generates destination topics based on the source tables. Therefore, field type conversion may occur. For example, if the data types of the fields in a destination topic are different from the data types of the fields in a source table, the synchronization solution converts the fields in the source table into the data types that can be written to the destination topic. You can click the name of a destination topic in the Kafka Topic column to view and modify the mappings between the fields of source and destination tables.
NoteYou can edit a destination topic only if you select Create Topic from the drop-down list in the Topic creation method column.
Step 5: Configure the resources required by the synchronization solution
After you create a synchronization solution, the synchronization solution generates batch synchronization nodes for full data synchronization and real-time synchronization nodes for incremental data synchronization. You must configure the parameters in the Configure Resources step.
You can configure the exclusive resource groups for Data Integration that you want to use to run real-time synchronization nodes and batch synchronization nodes, and the resource groups for scheduling that you want to use to run batch synchronization nodes. You can also click Advanced Configuration to configure the Number of concurrent writes on the target side and Allow Dirty Data Records parameters.
DataWorks uses resource groups for scheduling to issue the generated batch synchronization subtasks to resource groups for Data Integration and runs the subtasks on the resource groups for Data Integration. Therefore, a batch synchronization subtask also occupies the resources of a resource group for scheduling. You are charged fees for using the exclusive resource group for scheduling to schedule the batch synchronization subtasks. For information about the task issuing mechanism, see Mechanism for issuing nodes.
We recommend that you use different resource groups to run the generated batch and real-time synchronization subtasks. If you use the same resource group to run the subtasks, the subtasks compete for resources and affect each other. For example, CPU resources, memory resources, and networks used by the two types of subtasks may affect each other. In this case, the batch synchronization subtasks may slow down, or the real-time synchronization subtasks may be delayed. The batch or real-time synchronization subtasks may even be terminated by the out of memory (OOM) killer due to insufficient resources.
Step 6: Run the synchronization solution
Go to the Nodes page in Data Integration and find the created synchronization task.
Click Start or Commit and Run in the Actions column to run the synchronization task.
Click Running Details in the Actions column to view the running details of the synchronization task.
Appendix: Configure the formats of messages written to Kafka
After you configure and run a real-time synchronization solution, the solution generates a batch synchronization node to read all the existing data in the source and write the data to the destination Kafka topics in the JSON format. The solution also generates a real-time synchronization node to write incremental data to Kafka in real time. In addition, incremental DDL-based data changes in the source are also written to Kafka in the JSON format in real time. For more information about the formats of messages written to Kafka, see Appendix: Message formats.
If you run a batch synchronization node to synchronize data to Kafka, the payload.sequenceId, payload.timestamp.eventTime, and payload.timestamp.checkpointTime fields are set to -1 in the messages written to Kafka. The messages are in the JSON format.
What to do next
After a synchronization task is configured, you can manage the task. For example, you can add source tables to or remove source tables from the task, configure alerting and monitoring settings for the subtasks that are generated by the task, and view information about the running of the subtasks. For more information, see Perform O&M on a full and incremental synchronization task.