Data Integration supports real-time synchronization of data from single tables in data sources such as Kafka and LogHub to OSS. This topic describes how to use DataWorks Data Integration to synchronize data from Kafka to an OSS data lake in real time.
Limits
The version of the Kafka service that you use must range from 0.10.2 to 2.2.0.
Prerequisites
You have purchased a serverless resource group or an exclusive resource group for Data Integration.
You have created a Kafka data source and an OSS data source. For more information, see Create a data source for Data Integration.
You have established network connections between the resource group and data sources. For more information, see Network connectivity solutions.
Procedure
Step 1: Select a synchronization task type
Go to the Data Integration page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Integration.
In the left-side navigation pane, click Synchronization Task. Then, click Create Synchronization Task at the top of the page to go to the page for creating a synchronization task. Configure the following basic information:
Source And Destination:
Kafka→OSSNew Node Name: Specify a name for the synchronization task.
Synchronization Method:
Single table real-time.
Step 2: Configure network and resources
In the Network And Resource Configuration section, select the Resource Group that you want to use for the synchronization task. You can allocate Task Resource Usage in CUs for the task.
For Source Data Source select the added
kafkadata source. For Destination Data Source select the addedOSSdata source, and then click Test Connectivity.
After you make sure that both the source and destination data sources are connected, click Next.
Step 3: Configure the synchronization link
1. Configure the Kafka data source
In the wizard of the upper part of the configuration page, click Kafka and edit the Kafka Source Information.

In the Kafka Source Information section, select the Kafka topic from which you want to synchronize data.
Retain default values for other parameters, or modify their configurations based on your business requirements.
Click Data Sampling in the upper-right corner.
In the dialog box that appears, configure the Start Time and Sampled Data Records parameters and click Start Collection. The system samples data from the Kafka topic that you specified. You can preview the data in the Kafka topic. The data in the Kafka topic is used as input data for data preview and visualization configurations of a data processing node.
In the Output Field Configuration section, select the fields that you want to synchronize.
2. Configure a data processing node
You can click the
icon to add data processing methods. The following data processing methods are supported: Data Masking, Replace String, Data filtering, JSON Parsing, and Edit Field and Assign Value. You can arrange the data processing methods based on your business requirements. When the synchronization task is run, data is processed based on the processing order that you specify.

After you configure a data processing node, you can click Preview Data Output in the upper-right corner of the configuration page. In the dialog box that appears, you can click Re-obtain Output Of Ancestor Node to enable the data processing node to process the data that is sampled from the specified Kafka topic and preview the processing result.

Before you preview the result generated after the input data is processed by a data processing node, you must configure Data Sampling settings for the Kafka data source.
3. Configure the OSS data source
In the wizard of the upper part of the configuration page, click OSS to edit the OSS Destination Information.

In the OSS Destination Information section, select the basic information about the OSS object to which you want to write data.
Write Format: The following formats are supported: Hudi, Paimon, and lceberg.
Select Metadatabase Auto-build Location: If you have activated Data Lake Formation (DLF) within your Alibaba Cloud account, the system automatically creates a metadatabase and a metatable in DLF when data is synchronized to a data lake.
NoteCross-region metadatabase creation is not supported.
Storage Path: Select the OSS path in which you want to store the synchronized data.
Destination Database: Select the name of the database to which you want to write data. You can also select Create Database to create a DLF metadatabase and specify a Database Name.
Destination Table: Select the generation method of the OSS object to which you want to write data. Valid values: Create Table and Use Existing Table.
Table Name: Enter or select the name of the OSS object to which you want to write data.
(Optional) Modify the schema of a destination table.
If you select Create tables automatically for the Destination Table parameter, click Edit Table Schema. In the dialog box that appears, edit the schema of the destination table that will be automatically created. You can also click Re-generate Table Schema Based on Output Column of Ancestor Node to re-generate a schema based on the output columns of an ancestor node. You can select a column from the generated schema and configure the column as the primary key.
Configure mappings between fields in the source and fields in the destination.
After you complete the preceding configuration, the system automatically establishes mappings between fields in the source and fields in the destination based on the Map Fields with Same Name principle. You can modify the mappings based on your business requirements. One field in the source can map to multiple fields in the destination. Multiple fields in the source cannot map to the same field in the destination. If a field in the source has no mapped field in the destination, data in the field in the source is not synchronized to the destination.
4. Configure alert rules
To prevent the failure of the synchronization task from causing latency on business data synchronization, you can configure different alert rules for the synchronization task.
In the upper-right corner of the page, click Configure Alert Rule to go to the Alert Rule Configurations for Real-time Synchronization Subnode panel.
In the Configure Alert Rule panel, click Add Alert Rule. In the Add Alert Rule dialog box, configure the parameters to configure an alert rule.
NoteThe alert rules that you configure in this step take effect for the real-time synchronization subtask that will be generated by the synchronization task. After the configuration of the synchronization task is complete, you can refer to Run and manage real-time synchronization tasks to go to the Real-time Synchronization Task page and modify alert rules configured for the real-time synchronization subtask.
Manage alert rules.
You can enable or disable alert rules that are created. You can also specify different alert recipients based on the severity levels of alerts.
5. Configure advanced parameters
DataWorks allows you to modify the configurations of specific parameters. You can change the values of these parameters based on your business requirements.
To prevent unexpected errors or data quality issues, we recommend that you understand the meanings of the parameters before you change the values of the parameters.
In the upper-right corner of the configuration page, click Configure Advanced Parameters.
In the Configure Advanced Parameters panel, change the values of the desired parameters.
Step 6: Configure DDL capabilities
DDL operations may be performed on the source. You can click Configure DDL Capability in the upper-right corner of the page to configure rules to process DDL messages from the source based on your business requirements.
For more information, see Configure rules to process DDL messages.
Step 7: Configure a resource group
You can click Configure Resource Group in the upper-right corner of the page to view and change the resource groups that are used to run the current synchronization task.
Step 8: Perform a test on the synchronization task
After the preceding configuration is complete, you can click Perform Simulated Running in the upper-right corner of the configuration page to enable the synchronization task to synchronize the sampled data to the destination table. You can view the synchronization result in the destination table. If specific configurations of the synchronization task are invalid, an exception occurs during the test run, or dirty data is generated, the system reports an error in real time. This can help you check the configurations of the synchronization task and determine whether expected results can be obtained at the earliest opportunity.
In the dialog box that appears, configure the parameters for data sampling from the specified table, including the Start At and Sampled Data Records parameters.
Click Start Collection to enable the synchronization task to sample data from the source.
Click Preview to enable the synchronization task to synchronize the sampled data to the destination.
Step 9: Run the synchronization task
After the configuration of the synchronization task is complete, click Complete in the lower part of the page.
In the page of page, find the created synchronization task and click Start in the Operation column.
Click the Name or ID of the synchronization task in the Tasks section and view the detailed running process of the synchronization task.
Perform O&M operations on the synchronization task
View the status of the synchronization task
After the synchronization task is created, you can go to the Synchronization Task page to view all synchronization tasks that are created in the workspace and the basic information of each synchronization task.

You can click Start or Stop in the Operation column to start or stop a synchronization task. You can also choose More to perform operations such as Edit and View on the synchronization task.
For a started task, you can view the basic running status of the task in the Execution Overview column. You can also click the corresponding overview area to view the execution details.

The real-time synchronization task from Kafka to OSS has the following stages:
Schema Migration: This tab displays information such as whether a destination object is a newly created object or an existing object. For a newly created object, the DDL statement that is used to create the object is displayed.
Real-time Synchronization: This tab displays statistics about real-time synchronization, including real-time synchronization details, DDL records, and alert information.
Rerun the synchronization task
In some special cases, if you want to modify the fields to synchronize, the fields in a destination table, or table name information, you can also click Rerun in the Operation column of the desired synchronization task. This way, the system synchronizes the changes that are made to the destination. Data in the tables that are already synchronized and are not modified will not be synchronized again.
Directly click Rerun without modifying the configurations of the synchronization task to enable the system to rerun the synchronization task.
Modify the configurations of the synchronization task and then click Complete. Click Apply Updates that is displayed in the Operation column of the synchronization task to rerun the synchronization task for the latest configurations to take effect.