Data Integration supports real-time synchronization of data from single tables in data sources such as Kafka and LogHub to OSS. This topic describes how to use DataWorks Data Integration to synchronize data from Kafka to an OSS data lake in real time.
Limits
The Kafka service version must be between 0.10.2 and 2.2.0 (inclusive).
Prerequisites
Before you begin, make sure you have:
-
A serverless resource group or an exclusive resource group for Data Integration
-
A Kafka data source and an OSS data source. For more information, see Create a data source for Data Integration
-
Network connectivity established between the resource group and both data sources. For more information, see Network connectivity solutions
Create a real-time synchronization task
The configuration involves nine steps:
-
Configure alert rules (optional)
-
Configure advanced parameters (optional)
-
Configure DDL capabilities (optional)
-
Configure a resource group (optional)
Step 1: Select a synchronization task type
-
Go to the Data Integration page. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Integration > Data Integration. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Integration.
-
In the left-side navigation pane, click Synchronization Task, then click Create Synchronization Task at the top of the page. Configure the following basic settings:
Setting Value Source and destination Kafka→OSSNew node name A name for the synchronization task Synchronization method Single table real-time
Step 2: Configure network and resources
-
In the Network And Resource Configuration section, select the Resource Group to use for the synchronization task. Allocate Task Resource Usage in Compute Units (CUs) as needed.
-
For Source Data Source, select the added
kafkadata source. For Destination Data Source, select the addedOSSdata source, then click Test Connectivity.
-
After confirming that both data sources are connected, click Next.
Step 3: Configure the synchronization link
The synchronization link has three parts: the Kafka source, an optional data processing node, and the OSS destination. Configure them in order using the wizard at the top of the configuration page.
Configure the Kafka data source
Click Kafka in the wizard to open Kafka Source Information.
-
In the Kafka Source Information section, select the Kafka topic to synchronize. Adjust other parameters based on your requirements.
-
Click Data Sampling in the upper-right corner. In the dialog box, set Start Time and Sampled Data Records, then click Start Collection. The system samples data from the specified topic for preview and downstream data processing configuration.
-
In the Output Field Configuration section, select the fields to synchronize.
Configure a data processing node
Click the
icon to add one or more data processing methods. The following methods are available:
Arrange the methods in the order you want them applied. When the task runs, data is processed in that sequence.
To preview the processed output, click Preview Data Output in the upper-right corner, then click Re-obtain Output Of Ancestor Node in the dialog box.
Configure Data Sampling for the Kafka source before previewing processing output.
Configure the OSS destination
Click OSS in the wizard to open OSS Destination Information.
-
In the OSS Destination Information section, configure the following settings:
Setting Description Write Format The open table format for the destination: Hudi, Paimon, or Iceberg Select Metadatabase Auto-build Location If Data Lake Formation (DLF) is activated in your account, the system automatically creates a metadatabase and metatable in DLF when data is synchronized. Cross-region metadatabase creation is not supported. Storage Path The OSS path where synchronized data is stored Destination Database Select an existing database, or select Create Database and specify a Database Name to create a DLF metadatabase Destination Table Select Create Table to create a new OSS object, or Use Existing Table to write to an existing one Table Name The name of the OSS object -
(Optional) If you select Create Table for the Destination Table parameter, click Edit Table Schema to modify the destination table schema. In the dialog box, edit the schema directly or click Re-generate Table Schema Based on Output Column of Ancestor Node to regenerate it from upstream output columns. Select a column to set it as the primary key.
-
Review the field mappings. The system automatically maps fields by name (Map Fields with Same Name principle). Adjust mappings as needed:
-
One source field can map to multiple destination fields.
-
Multiple source fields cannot map to the same destination field.
-
Source fields with no mapped destination field are not synchronized.
-
Step 4: Configure alert rules
Alert rules notify you when the synchronization task fails or experiences issues, helping prevent latency in downstream data pipelines.
-
In the upper-right corner, click Configure Alert Rule to open the Alert Rule Configurations for Real-time Synchronization Subnode panel.
-
Click Add Alert Rule and configure the parameters.
Alert rules configured here apply to the real-time synchronization subtask generated by this task. After setup is complete, go to the Real-time Synchronization Task page to modify them. For more information, see Run and manage real-time synchronization tasks.
-
Enable or disable alert rules as needed. Set different alert recipients based on severity level.
Step 5: Configure advanced parameters
-
In the upper-right corner, click Configure Advanced Parameters.
-
In the Configure Advanced Parameters panel, change the parameter values.
Understand each parameter before changing its value to avoid unexpected errors or data quality issues.
Step 6: Configure DDL capabilities
Data Definition Language (DDL) operations may be performed on the source during synchronization. To handle them, click Configure DDL Capability in the upper-right corner and define rules for processing DDL messages.
For more information, see Configure rules to process DDL messages.
Step 7: Configure a resource group
To view or change the resource groups used by this synchronization task, click Configure Resource Group in the upper-right corner.
Step 8: Test the synchronization task
Run a simulated test to validate the configuration before going live. The system reports errors in real time if configurations are invalid, exceptions occur, or dirty data is generated.
-
In the upper-right corner, click Perform Simulated Running.
-
In the dialog box, set Start At and Sampled Data Records.
-
Click Start Collection to sample data from the source.
-
Click Preview to synchronize the sampled data to the destination and verify the result.
Step 9: Run the synchronization task
-
Click Complete at the bottom of the page to finish configuration.
-
On the Data Integration > Synchronization Task page, find the task and click Start in the Operation column.
-
Click the task's Name or ID to view the detailed execution process.
Manage the synchronization task
View task status
After the task starts, go to the Synchronization Task page to see all tasks in the workspace and their status.
-
Click Start or Stop in the Operation column to start or stop a task. Use More to access Edit, View, and other operations.
-
For a running task, check the Execution Overview column for basic status. Click the overview area for execution details.
The real-time synchronization task has two execution stages:
-
Schema Migration: Shows whether the destination object is newly created or existing. For new objects, the DDL statement used to create it is displayed.
-
Real-time Synchronization: Shows real-time synchronization statistics, DDL records, and alert information.
Rerun the synchronization task
Use Rerun when you need to apply changes to the synchronized fields, destination table fields, or table name. Tables that are already synchronized and unchanged are not re-synchronized.
Two ways to rerun:
-
Rerun without changes: Click Rerun in the Operation column to rerun the task with the current configuration.
-
Rerun with updated configuration: Modify the task configuration, click Complete, then click Apply Updates in the Operation column for the latest configuration to take effect.