A single-table real-time synchronization task initializes the destination ApsaraDB for OceanBase table structure based on the content structure of the specified topic in the source Kafka. The task then synchronizes historical data from the specified Kafka topic to ApsaraDB for OceanBase and continuously synchronizes incremental data in real time. This topic describes how to synchronize Kafka topic data to ApsaraDB for OceanBase in real time.
Prerequisites
You have purchased a Serverless resource group or an exclusive resource group for Data Integration.
You have created a Kafka data source and an ApsaraDB for OceanBase data source. For more information, see Create a data source for Data Integration.
You have completed the network connectivity between the resource group and the data sources. For more information, see Network connectivity solutions.
Procedure
Step 1: Select a synchronization task type
Go to the Data Integration page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Integration.
In the left-side navigation pane, click Synchronization Task, and then click Create Synchronization Task at the top of the page to go to the synchronization task creation page. Configure the following basic information:
Data Source And Destination:
Kafka→ApsaraDB for OceanBaseNew Task Name: Customize the synchronization task name.
Synchronization Type:
Single-table Real-time.
Step 2: Configure network and resources
In the Network And Resource Configuration section, select the Resource Group used for the synchronization task. You can allocate the number of CUs for Task Resource Usage.
For Source Data Source, select the added
kafkadata source. For Destination Data Source, select the addedApsaraDB for OceanBasedata source. Then click Test Connectivity.
After you confirm that both the source data source and the destination data source are connected successfully, click Next.
Step 3: Configure the synchronization link
1. Configure the Kafka source
Click the Kafka data source at the top of the page to edit the Kafka Source Information.

In the Kafka Source Information section, select the topic in the Kafka cluster that you want to synchronize.
Retain default values for other parameters, or modify their configurations based on your business requirements.
Click Data Sampling in the upper-right corner.
In the dialog box that appears, specify the Start Time and Number Of Samples, and then click the Start Collection button to sample data from the specified Kafka topic. You can also preview the data in the topic to provide input for data preview and visual configuration of subsequent data processing nodes.
In the Output Field Configuration section, select the fields that you want to synchronize as needed.
2. Edit data processing nodes
You can click the
icon to add data processing methods. The following data processing methods are supported: Data Masking, Replace String, Data filtering, JSON Parsing, and Edit Field and Assign Value. You can arrange the data processing methods based on your business requirements. When the synchronization task is run, data is processed based on the processing order that you specify.

After you complete the configuration of a data processing node, you can click the Data Output Preview button in the upper-right corner. In the dialog box that appears, click Retrieve Upstream Output Again to simulate the result of the Kafka topic sample data after it is processed by the current data processing node.

The data output preview strongly depends on the Data Sampling of the Kafka source. Before you perform data output preview, you must complete data sampling in the Kafka source form.
3. Configure ApsaraDB for OceanBase destination information
Click the ApsaraDB for OceanBase data destination at the top of the page to edit the OceanBase Destination Information.

In the OceanBase Destination Information section, select whether to Automatically Create Table or Use Existing Table for the OceanBase table to be written.
If you select to automatically create a table, a table with the same name as the data source table is created by default. You can manually modify the destination table name.
If you select to use an existing table, select the destination table that you want to synchronize from the drop-down list.
(Optional) Modify the schema of a destination table.
If you select Create tables automatically for the Destination Table parameter, click Edit Table Schema. In the dialog box that appears, edit the schema of the destination table that will be automatically created. You can also click Re-generate Table Schema Based on Output Column of Ancestor Node to re-generate a schema based on the output columns of an ancestor node. You can select a column from the generated schema and configure the column as the primary key.
NoteThe destination table must have a primary key. Otherwise, the configurations cannot be saved.
Configure mappings between fields in the source and fields in the destination.
After you complete the preceding configuration, the system automatically establishes mappings between fields in the source and fields in the destination based on the same-name mapping principle. You can modify the mappings based on your business requirements. One field in the source can map to multiple fields in the destination. Multiple fields in the source cannot map to the same field in the destination. If a field in the source has no mapped field in the destination, data in the field in the source is not synchronized to the destination.
4. Configure alert rules
To prevent the failure of the synchronization task from causing latency on business data synchronization, you can configure different alert rules for the synchronization task.
In the upper-right corner of the page, click Configure Alert Rule to go to the Configure Alert Rule panel.
In the Configure Alert Rule panel, click Add Alert Rule. In the Add Alert Rule dialog box, configure the parameters to configure an alert rule.
NoteThe alert rules that you configure in this step take effect for the real-time synchronization subtask that will be generated by the synchronization task. After the configuration of the synchronization task is complete, you can refer to Manage real-time synchronization tasks to go to the Real-time Synchronization Task page and modify alert rules configured for the real-time synchronization subtask.
Manage alert rules.
You can enable or disable alert rules that are created. You can also specify different alert recipients based on the severity levels of alerts.
5. Configure advanced parameters
DataWorks allows you to modify the configurations of specific parameters. You can change the values of these parameters based on your business requirements.
To prevent unexpected errors or data quality issues, we recommend that you understand the meanings of the parameters before you change the values of the parameters.
In the upper-right corner of the configuration page, click Configure Advanced Parameters.
In the Configure Advanced Parameters panel, change the values of the desired parameters.
6. Configure resource groups
You can click Configure Resource Group in the upper-right corner of the page to view and change the resource groups that are used to run the current synchronization task.
7. Perform a test on the synchronization task
After the preceding configuration is complete, you can click Perform Simulated Running in the upper-right corner of the configuration page to enable the synchronization task to synchronize the sampled data to the destination table. You can view the synchronization result in the destination table. If specific configurations of the synchronization task are invalid, an exception occurs during the test run, or dirty data is generated, the system reports an error in real time. This can help you check the configurations of the synchronization task and determine whether expected results can be obtained at the earliest opportunity.
In the dialog box that appears, configure the parameters for data sampling from the specified table, including the Start At and Sampled Data Records parameters.
Click Start Collection to enable the synchronization task to sample data from the source.
Click Preview to enable the synchronization task to synchronize the sampled data to the destination.
8. Run the synchronization task
After the configuration of the synchronization task is complete, click Complete in the lower part of the page.
In the Tasks section of the Synchronization Task page, find the created synchronization task and click Start in the Operation column.
Click the name or ID of the synchronization task in the Tasks section and view the detailed running process of the synchronization task.
Perform O&M operations on the synchronization task
View the status of the synchronization task
After the data synchronization solution is created, you can go to the Tasks page to view all data synchronization solutions created in the workspace and the basic information of each solution.

You can Start or Stop a synchronization task in the Actions column. You can also Edit or View a synchronization task in the More drop-down list.
For a started task, you can see the basic running status in Execution Overview, or click the corresponding overview area to view the execution details.

The single-table real-time synchronization task from Kafka to ApsaraDB for OceanBase consists of two steps:
Schema Migration: includes the creation method of the destination table (existing table or automatic table creation). If automatic table creation is selected, the DDL for table creation will be displayed.
Real-time Data Synchronization: includes statistics information for real-time synchronization, including real-time read and write traffic, dirty data, failover, and operation logs.
Rerun the synchronization task
In some special cases, if you want to modify the fields to synchronize, the fields in a destination table, or table name information, you can also click Rerun in the Operation column of the desired synchronization task. This way, the system synchronizes the changes that are made to the destination. Data in the tables that are already synchronized and are not modified will not be synchronized again.
Directly click Rerun without modifying the configurations of the synchronization task to enable the system to rerun the synchronization task.
Modify the configurations of the synchronization task and then click Complete. Click Apply Updates that is displayed in the Operation column of the synchronization task to rerun the synchronization task for the latest configurations to take effect.