After you prepare data sources, network environments, and resources, you can create a real-time synchronization node to synchronize data to Kafka. This topic describes how to create a real-time synchronization node and view the status of the node.

Prerequisites

  1. The data sources that you want to use are prepared. Before you configure a data synchronization node, you must prepare the data sources from which you want to read data and to which you want to write data. This way, when you configure a data synchronization node, you can select the data sources. For information about the data source types, readers, and writers that are supported by real-time synchronization, see Data source types that support real-time synchronization.
    Note For information about the items that you need to understand before you prepare a data source, see Overview.
  2. An exclusive resource group for Data Integration that meets your business requirements is purchased. For more information, see Create and use an exclusive resource group for Data Integration.
  3. Network connections are established between the exclusive resource group for Data Integration and the data sources. For more information, see Establish a network connection between a resource group and a data source.
  4. The data source environments are prepared. You must create an account that can be used to access a database in the source and an account that can be used to access a database in the destination. You must also grant the accounts the permissions required to perform specific operations on the databases based on your configurations for data synchronization. For more information, see Overview.

Limits

  • You can use only exclusive resource groups for Data Integration to run real-time synchronization nodes.
  • You can use a real-time synchronization node to synchronize data only from a MySQL, Oracle, or PolarDB data source to Kafka.

Precautions

  • If a source table has a primary key, the values of the primary key are used as the keys in Kafka records. This ensures that changes of data that use the same primary key value in the source table are sequentially written to the same partition in Kafka.
  • If you select source tables that do not have a primary key for synchronization when you configure the destination, empty strings are used as the keys in Kafka records during data synchronization. To ensure that data changes in the source table can be sequentially written to Kafka, you must make sure that the Kafka topic to which the data changes are written contains only one partition. You can specify a custom primary key for a source table that does not have a primary key when you configure a destination table. In this case, a field or a combination of multiple fields in the source table are used as the primary key. The values of the primary key are used as the keys in Kafka records during data synchronization.
  • To ensure that changes of data that use the same primary key value in the source table are sequentially written to the same partition in Kafka when a response exception occurs on the Kafka data source, you must add the following configurations to extended parameters when you add the Kafka data source to DataWorks:
    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}
    Important After you add the configurations to the extended parameters of the Kafka data source, data synchronization performance is significantly degraded. You must balance the performance and order of data write operation.
  • For more information about the format of a Kafka message, format of a heartbeat message that is generated by a synchronization node, and format of a Kafka message for data changes in the source, see Appendix: Message formats.

Create a real-time synchronization node

  1. Create a real-time synchronization node to synchronize all data in a database.
  2. Configure a resource group.
  3. Configure the source and synchronization rules.
    1. In the Data Source section of the Configure Source and Synchronization Rules step, configure the Type, Data source, and Encoding parameters.
      Note You can set the Type parameter only to MySQL, Oracle, or PolarDB.
    2. Select the tables from which you want to read data.
      In the Source Table section, all tables in the selected data source are displayed in the Source Table list. You can select all or some tables from the Source Table list and click the Icon icon to move the tables to the Selected Source Table list.
      Important If a selected table does not have a primary key, the table cannot be synchronized in real time.
    3. In the Source Table section, select The input text is used as a regular expression to automatically add tables. You can select this check box only if you set the Type parameter to MySQL.
      Then, you can enter a regular expression in the Database Filtering and Table Filtering search boxes. When the real-time synchronization node runs, DataWorks automatically checks whether the data changes in the databases and the tables whose names match the regular expressions based on binary logs. If DataWorks identifies data changes, DataWorks synchronizes the changes to specific Kafka topics based on the mapping rules that you configure in the Conversion Rule for Table Name section. If the name of a specified Kafka topic does not exist, DataWorks creates a Kafka topic whose name is the same and writes data to the Kafka topic. Regular expression

      The following table describes the regular expressions that can be used to match databases and tables based on names.

      Regular expressionDescription
      .Matches a character except for a line feed.
      x?Matches zero or one occurrence of the x string.
      x*Matches zero or more occurrences of the x string. The name that contains the minimum number of occurrences of the x string can be matched.
      x+Matches one or more occurrences of the x string. The name that contains the minimum number of occurrences of the x string can be matched.
      .*Matches zero or more occurrences of a character.
      .+Matches one or more occurrences of a character.
      {m}Matches exactly m occurrences of a specific string.
      {m,n}Matches more than m and less than n occurrences of a specific string.
      {m,}Matches more than m occurrences of a specific string.
      []Matches a character in the character set that is defined in brackets [].
      [^]Matches a character that is excluded from the character set and is defined in brackets [].
      \dMatches a digit. This regular expression works in the same way as [0-9].
      \d+Matches a string that consists of multiple digits. This regular expression works in the same way as [0-9]+.
      \DMatches a non-digit character.
      \D+Matches a string that consists of non-digit characters.
      \wMatches a string that contains letters or digits. This regular expression works in the same way as [a-zA-Z0-9_].
      \w+Matches strings that contain letters or digits. This regular expression works in the same way as [a-zA-Z0-9_]+.
      \WMatches a string that does not contain letters or digits. This regular expression works in the same way as [^a-zA-Z0-9_].
      \W+Matches strings that do not contain letters or digits. This regular expression works in the same way as [^a-zA-Z0-9_]+.
      \sMatches a space. This regular expression works in the same way as [\n\t\r\f].
      \s+Matches spaces. This regular expression works in the same way as [\n\t\r\f]+.
      \SMatches a non-space character. This regular expression works in the same way as [^\n\t\r\f].
      \S+Matches non-space characters. This regular expression works in the same way as [^\n\t\r\f]+.
      \bMatches a string that starts or ends with a letter or a digit.
      \BMatches a string that does not start or end with a letter or a digit.
      (a|b|c)Matches a string that contains a, b, or c. a, b, and c are variables that specify a character. Make sure that this regular expression is enclosed in parentheses (). Otherwise, errors may occur when you run the expression.
      \Escapes special characters in a regular expression, such as $ ^ [] . ? + | \ * {}.
    4. In the Conversion Rule for Table Name section, click Add Rule, select a rule type, and then configure a mapping rule based on the rule type that you selected.
      By default, data in the source tables is written to the destination Kafka topics that are named the same as the source tables. You can also configure mapping rules to define the names of the destination Kafka topics to which you want to write data. You can configure a mapping rule to synchronize data from multiple tables in the source to the same topic in the destination. You can also configure a mapping rule to synchronize data from source tables whose names start with a specified prefix to the destination topics whose names start with another specified prefix. Data Integration allows you to use a regular expression to configure a mapping rule to specify the names of the destination Kafka topics. You can also concatenate built-in variables to specify the names of the destination Kafka topics. For more information about the configuration logic, see Configure the source and synchronization rules.
  4. Configure the destination topics.
    1. Configure the basic information.
      ParameterDescription
      Source tables without primary keys can be synchronizedSpecifies whether a source table that does not have a primary key can be synchronized to Kafka. If you select Source tables without primary keys can be synchronized in the Set Destination Topic step, a source table that does not have a primary key can be synchronized to the destination. In this case, empty strings are used as the keys in Kafka records during data synchronization. To ensure that changes of data that use the same primary key value in the source table are sequentially written to the same partition in Kafka, you must make sure that the Kafka topic to which the data changes are written contains only one partition.
      Send heartbeat recordSpecifies whether to send alert notifications to Kafka. If you select Send heartbeat record, the real-time synchronization node writes a record that contains the current timestamp to Kafka at an interval of 5 seconds. This way, you can view the updates of the timestamp for the most recent record that is written to Kafka and check the progress of the data synchronization even if no new records are written to Kafka. For more information about the format of heartbeat records, see Appendix: Message formats.
      When one record in the source is updated, one Kafka record is generatedSpecifies whether to synchronize the data changes that are generated by an update operation on a source table to the destination table.
      • If you synchronize data from a relational database and select this check box, the data changes that are generated by an update operation on a data record in the relational database are stored in the same destination Kafka record. The record contains the data before the change and the data after the change.
      • If you synchronize data from a relational database and you do not select this check box, the data changes that are generated by an update operation on a data record in the relational database are stored in two destination Kafka records. One record contains the data before the change, and the other record contains the data after the change.
    2. Refresh mappings between source tables and destination Kafka topics.
      Click Refresh source table and Kafka Topic mapping to generate destination Kafka topics based on the mapping rule that you configured in the Conversion Rule for Table Name section. If no mapping rule is configured in the Conversion Rule for Table Name section, data in the source tables is written to the Kafka topics that are named the same as the source tables. If no destination Kafka topic exists in the destination, the system automatically creates a topic in the destination. You can also modify the topic generation method and add additional fields to the destination Kafka topics.
      Note The names of destination Kafka topics are automatically generated based on the mapping rule that you configured in the Conversion Rule for Table Name section.
      OperationDescription
      Synchronize a source table that does not have a primary key
      • If a table in the source database has a primary key, the values of the primary key are used as the keys in Kafka records during data synchronization. This ensures that changes of data that use the same primary key value in the source table are sequentially written to the same partition in Kafka.
      • If a table in the source database does not have a primary key, you can perform operations based on the following usage scenarios:
        • If you select Source tables without primary keys can be synchronized. in the Set Destination Topic step, source tables that do not have primary keys can be synchronized. In this case, empty strings are used as the keys in Kafka records during data synchronization. To ensure that changes of data that use the same primary key value in the source table are sequentially written to the same partition in Kafka, you must make sure that the Kafka topic to which the data changes are written contains only one partition. You can also click the Edit icon to specify a custom primary key for a source table that does not have a primary key. You can use a field or a combination of multiple fields in the source table as the primary key. The values of the primary key are used as the keys in Kafka records during data synchronization.
        • If you do not select Source tables without primary keys can be synchronized. in the Set Destination Topic step, errors occur when you synchronize source tables that do not have primary keys. In this case, delete the tables or click the Edit icon in the Synchronized Primary Key column to specify custom primary keys for the tables before the data synchronization is resumed.
      Select a topic creation methodThe source of the destination Kafka topics. Valid values: Create Topic and Use Existing Topic.
      • If you select Use Existing Topic from the drop-down list in the Topic creation method column, you can select a destination topic from the drop-down list in the Kafka Topic column.
      • If you select Create Topic from the drop-down list in the Topic creation method column, the name of the topic that is automatically created appears in the Kafka Topic column. You can click the automatically created Kafka topic to view and modify the name and description of the topic.
      Add additional fields to a destination Kafka topic and assign values to the fieldsYou can click Edit additional fields in the Actions column of a destination Kafka topic to add additional fields to the topic and assign values to the fields. You can manually assign constants and variables to the additional fields.
      Note You can add additional fields to a destination Kafka topic only if you select Create Topic from the drop-down list in the Topic creation method column.
    3. Click Next.
      If you select Create Topic from the drop-down list in the Topic creation method column, you must click Start table building in the Create Table dialog box to create Kafka topics.
  5. Configure the resources required to run the real-time synchronization node.
    1. In the Configure Resources step, configure the parameters.
      ParameterDescription
      Maximum number of connections supported by source readThe maximum number of Java Database Connectivity (JDBC) connections that are allowed for the source. Configure this parameter based on the resources of the source database. Default value: 15.
      Maximum number of parallel threads allowed to read by destinationThe maximum number of parallel threads that the synchronization node uses to read data from the source table or write data to the destination. Maximum value: 32. Specify an appropriate number based on the specifications of the exclusive resource group for Data Integration and the data write capabilities of the destination.
    2. Click Complete Configuration.

Commit and deploy the real-time synchronization node

Commit and deploy the MySQL node.
  1. Click the Save icon in the top toolbar to save the node.
  2. Click the Submit icon in the top toolbar to commit the node.
  3. In the Commit Node dialog box, configure the Change description parameter.
  4. Click OK.
If you use a workspace in standard mode, you must deploy the node in the production environment after you commit the node. On the left side of the top navigation bar, click Deploy. For more information, see Deploy nodes.

What to do next

  • After the real-time synchronization node is configured, you can start and manage the node on the Real Time DI page in Operation Center. To go to the Real Time DI page, perform the following operations: Log on to the DataWorks console and go to the Operation Center page. In the left-side navigation pane of the Operation Center page, choose RealTime Task > RealTime DI. For more information, see Perform operations for a real-time synchronization node.
  • The data in the source is written to the destination Kafka topics in the JSON format. For more information about the formats of Kafka messages that indicate data changes and status of data in the data source, see Appendix: Message formats.