All Products
Search
Document Center

DataWorks:Create a real-time synchronization solution to synchronize data to Kafka

Last Updated:Dec 20, 2023

You can create a real-time synchronization solution and use the solution to synchronize full and incremental data to Kafka. This topic describes how to create a real-time synchronization solution to synchronize data to Kafka.

Prerequisites

  1. The required data sources are configured. Before you configure a synchronization task, you must configure the data sources from which you want to read data and to which you want to write data. This way, you can select the data sources when you configure a synchronization task. For information about the supported data source types and the configuration of a data source, see Supported data source types and read and write operations.

    Note

    For information about the items that you need to understand before you configure a data source, see Overview.

  2. The data source environments are prepared. Before you configure a synchronization task, you must create an account that can be used to access a database and grant the account the permissions required to perform specific operations on the database based on your configurations for data synchronization. For more information, see Overview.

Background information

Item

Description

Number of tables from which you can read data

  • You can read data from multiple source tables and write the data to multiple destination topics.

  • You can configure mapping rules for the source and destination. This way, you can read data from multiple tables and write the data to the same destination topic.

Nodes

A real-time synchronization solution generates batch synchronization nodes to synchronize full data and real-time synchronization nodes to synchronize incremental data. The number of batch synchronization nodes that are generated by the solution varies based on the number of tables from which you can read data.

Data write

After you run a real-time synchronization solution, full data in the source is written to the destination by using batch synchronization nodes. Then, incremental data in the source is written to the destination by using real-time synchronization nodes.

  • If a source table has a primary key, the values of the primary key are used as the keys in Kafka records during data synchronization. This ensures that changes of data with the same primary key value in the source table are written to the same partition in Kafka in an orderly manner.

  • For a source table that does not have a primary key, if you select Source tables without primary keys can be synchronized when you configure the destination, empty strings are used as the keys in Kafka records during data synchronization. In this case, to ensure that data changes in the source table can be written to Kafka in an orderly manner, you must make sure that the Kafka topic to which the data changes are written contains only one partition. You can also customize a primary key for a source table that does not have a primary key when you configure the destination. In this case, a field or a combination of several fields in the source table is used as the primary key. The values of the primary key are used as the keys in Kafka records during data synchronization.

  • If you want to make sure that changes of data with the same primary key value in a source table are written to the same partition in Kafka in an orderly manner when a response exception occurs on the Kafka data source, you must add the following settings to extended parameters when you add the Kafka data source to DataWorks:

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    Important

    After you add the settings to the extended parameters of the Kafka data source, data synchronization performance is significantly degraded. You must balance performance and order keeping of data write.

  • For more information about the format of a Kafka message, format of a heartbeat message generated by a synchronization node, and format of a Kafka message for a data change in the source, see Appendix: Message formats.

Precautions

To run synchronization nodes on an exclusive resource group for Data Integration, make sure that the version of the DataX plug-in that is used to run batch synchronization nodes is 20210726203000 or later and the version of the StreamX plug-in that is used to run real-time synchronization nodes is 202107121400 or later. Otherwise, a data format error may be reported or synchronization nodes for synchronizing incremental data or full data to Kafka fail to be run.

View the version of DataX: Log on to the DataWorks console and go to the Operation Center page. In the left-side navigation pane of the Operation Center page, choose Cycle Task Maintenance > Patch Data. Right-click the name of the batch synchronization node for which data is backfilled and select View Runtime Log in the directed acyclic graph (DAG) of the node. On the page that appears, search Detail log url in the log area and click the link to go to the page that displays the details of the node. Then, search for the version information in the format of DataX( ..... ),From Alibaba! on the page that appears. For example, you can search for DataX (20210709_keyindex-20210709144909), From Alibaba !" in the log area to view the version information of DataX, as shown in the third figure in this section. Operation Center page

View the version of StreamX: Log on to the DataWorks console and go to the Operation Center page. In the left-side navigation pane of the Operation Center page, choose RealTime Task Maintenance > Real Time DI. On the Real Time DI page, click the name of the desired real-time synchronization node. Then, click the Log tab and search for the version information in the format of StreamX( ..... ),From Alibaba! in the log area. For example, you can search for StreamX (202107290000_20210729121213), From Alibaba ! in the log area to view the version information of StreamX, as shown in the following figure. View the version of StreamX

Procedure

  1. Step 1: Select a synchronization solution

  2. Step 2: Configure network connections for data synchronization

  3. Step 3: Configure the source and synchronization rules

  4. Step 4: Configure a destination topic

  5. Step 5: Configure the resources required by the synchronization solution

  6. Step 6: Run the synchronization solution

Step 1: Select a synchronization solution

Go to the Data Integration page in the DataWorks console and click Create Data Synchronization Solution. On the Create Data Synchronization Solution page, select a source and a destination for data synchronization from the drop-down lists. Then, select One-click real-time synchronization to Kafka.

Step 2: Configure network connections for data synchronization

Select a source, a destination, and a resource group that is used to run nodes. Test the network connectivity to make sure that the resource group is connected to the source and destination. For more information, see Configure network connections for data synchronization.

Step 3: Configure the source and synchronization rules

  1. In the Basic Configuration section, configure the parameters, such as the Solution Name and Location parameters, based on your business requirements.
  2. In the Data Source section, confirm the information about the source.
  3. In the Source Table section, select the tables from which you want to read data from the Source Table list. Then, click the Icon icon to add the tables to the Selected Source Table list.

    The Selected Source Table list displays all tables in the source. You can select all or specific tables.

  4. In the Mapping Rules for Table Names section, click Add Rule, select a rule type, and then configure a mapping rule of the selected type.

    By default, data in a source table is written to a Kafka topic that has the same name as the source table. You can specify a destination topic name in a mapping rule to write data in multiple source tables to the same Kafka topic. You can also specify prefixes in a mapping rule to write data in source tables with a specific prefix to Kafka topics with the same names as the source tables but a different prefix. You can use regular expressions to convert the names of the Kafka topics. You can also use built-in variables to add prefixes and suffixes to the names of destination topics.

Step 4: Configure a destination topic

  1. Specify the basic information.

    Parameter

    Description

    Source tables without primary keys can be synchronized

    Specifies whether a source table that does not have a primary key can be synchronized to the destination Kafka topic. If you select Source tables without primary keys can be synchronized in the Set Destination Topic step, a source table that does not have a primary key can be synchronized to the destination. However, empty strings are used as the keys in Kafka records during data synchronization. To ensure that changes of data with the same primary key value in the source table can be written to Kafka in an orderly manner, you must make sure that the Kafka topic to which the data changes are written contains only one partition.

    Send heartbeat record

    Specifies whether to send alert information to Kafka. If you select Send heartbeat record, the real-time synchronization node generated by the synchronization solution writes a record that contains the current timestamp to Kafka every 5 seconds. This way, you can view the updates of the timestamp for the latest record written to Kafka and check the progress of the data synchronization even if no new records are written to Kafka.

    When one record in the source is updated, one Kafka record is generated

    Specifies whether to store the data before and after an update in one Kafka record.

    • If you select this checkbox and one record in the source relational database is updated, the data before and after the update is stored in one Kafka record.

    • If you do not select this checkbox and one record in the source relational database is updated, the data before and after the update is separately stored in two Kafka records.

  2. Configure mappings between the source table and destination Kafka topic.

    Click Refresh source table and Kafka Topic mapping to create a destination topic based on the rules you configured in the Mapping Rules for Table Names section in Step 3. If no mapping rule is configured in Step 3, data in the source table is written to the destination topic that has the same name as the source table. If no destination topic that has the same name as the source table exists, the system automatically creates such a destination topic. You can also change the method of creating the destination topic and add additional fields to the destination topic.

    Note

    The name of the destination topic is generated based on the mapping rules that you configured in the Mapping Rules for Table Names section.

    Operation

    Description

    Select a primary key for a source table

    • If a table in the source database has a primary key, the values of the primary key are used as the keys in Kafka records during data synchronization. This ensures that changes of data with the same primary key value in the table are written to the same partition in Kafka in an orderly manner.

    • If a table in the source database does not have a primary key, the following situations occur:

      • If you select Source tables without primary keys can be synchronized. in the Set Destination Topic step, source tables without primary keys can be synchronized. In this case, empty strings are used as the keys in Kafka records during data synchronization. To ensure that changes of data with the same primary key value in a source table are written to Kafka in an orderly manner, you must make sure that the Kafka topic to which the data changes are written contains only one partition. You can also click the Edit icon to customize a primary key for a source table that does not have a primary key. You can use a field or a combination of several fields in the source table as the primary key. The values of the primary key are used as the keys in Kafka records during data synchronization.

      • If you do not select Source tables without primary keys can be synchronized. in the Set Destination Topic step, errors occur when you synchronize source tables without primary keys. In this case, you must delete these tables or click the Edit icon to customize primary keys for the tables before the data synchronization can continue.

    Select the method of creating a destination topic

    You can set the Topic creation method to Create Topic or Use Existing Topic.

    • Use Existing Topic: If you select this method, you must select the desired destination topic from the drop-down list in the Kafka Topic column.

    • Create Topic: If you select this method, the name of the Kafka topic that is automatically created appears in the Kafka Topic column. You can click the automatically created topic to view and modify the name and description of the topic.

    Edit additional fields

    You can click Edit additional fields in the Actions column to add additional fields to a destination topic and assign values to the fields. The values can be constants or variables.

    Note

    You can add additional fields only if you select Create Topic from the drop-down list in the Topic creation method column.

    Edit destination topics

    By default, the synchronization solution generates destination topics based on the source tables. Therefore, field type conversion may occur. For example, if the data types of the fields in a destination topic are different from the data types of the fields in a source table, the synchronization solution converts the fields in the source table into the data types that can be written to the destination topic. You can click the name of a destination topic in the Kafka Topic column to view and modify the mappings between the fields of source and destination tables.

    Note

    You can edit a destination topic only if you select Create Topic from the drop-down list in the Topic creation method column.

Step 5: Configure the resources required by the synchronization solution

After you create a synchronization solution, the synchronization solution generates batch synchronization nodes for full data synchronization and real-time synchronization nodes for incremental data synchronization. You must configure the parameters in the Configure Resources step.

You can configure the exclusive resource groups for Data Integration that you want to use to run real-time synchronization nodes and batch synchronization nodes, and the resource groups for scheduling that you want to use to run batch synchronization nodes. You can also click Advanced Configuration to configure the Number of concurrent writes on the target side and Allow Dirty Data Records parameters.

Note
  • DataWorks uses resource groups for scheduling to issue the generated batch synchronization subtasks to resource groups for Data Integration and runs the subtasks on the resource groups for Data Integration. Therefore, a batch synchronization subtask also occupies the resources of a resource group for scheduling. You are charged fees for using the exclusive resource group for scheduling to schedule the batch synchronization subtasks. For information about the task issuing mechanism, see Mechanism for issuing nodes.

  • We recommend that you use different resource groups to run the generated batch and real-time synchronization subtasks. If you use the same resource group to run the subtasks, the subtasks compete for resources and affect each other. For example, CPU resources, memory resources, and networks used by the two types of subtasks may affect each other. In this case, the batch synchronization subtasks may slow down, or the real-time synchronization subtasks may be delayed. The batch or real-time synchronization subtasks may even be terminated by the out of memory (OOM) killer due to insufficient resources.

Step 6: Run the synchronization solution

  1. Go to the Nodes page in Data Integration and find the created synchronization task.

  2. Click Start or Commit and Run in the Actions column to run the synchronization task.

  3. Click Running Details in the Actions column to view the running details of the synchronization task.

Appendix: Configure the formats of messages written to Kafka

After you configure and run a real-time synchronization solution, the solution generates a batch synchronization node to read all the existing data in the source and write the data to the destination Kafka topics in the JSON format. The solution also generates a real-time synchronization node to write incremental data to Kafka in real time. In addition, incremental DDL-based data changes in the source are also written to Kafka in the JSON format in real time. For more information about the formats of messages written to Kafka, see Appendix: Message formats.

Note

If you run a batch synchronization node to synchronize data to Kafka, the payload.sequenceId, payload.timestamp.eventTime, and payload.timestamp.checkpointTime fields are set to -1 in the messages written to Kafka. The messages are in the JSON format.

What to do next

After a synchronization task is configured, you can manage the task. For example, you can add source tables to or remove source tables from the task, configure alerting and monitoring settings for the subtasks that are generated by the task, and view information about the running of the subtasks. For more information, see Perform O&M on a full and incremental synchronization task.