DataWorks Data Integration provides single-table real-time synchronization tasks for low-latency, high-throughput data replication and transfer between various data sources. This feature uses an advanced real-time computing engine to capture data changes (inserts, deletes, and updates) at the source and apply them to the destination. This topic demonstrates how to configure the task using the real-time synchronization of a single table from Kafka to MaxCompute as an example.
Preparations
-
Prepare data sources
-
Create source and destination data sources. For more information about data source configuration, see Data Source Management.
-
Ensure that the data sources support real-time synchronization. For more information, see Supported data sources and synchronization solutions.
-
Some data sources, such as Hologres and Oracle, require you to enable logging. The method for enabling logs varies by data source. For more information, see Data Source List.
-
-
Resource group: Purchase and configure a Serverless resource group.
-
Network connectivity: Establish a network connection between the resource group and the data sources.
Step 1: Create a sync task
-
Log on to the DataWorks console. In the target region, click in the left-side navigation pane. Select a workspace from the drop-down list and click Go to Data Integration.
-
In the left-side navigation pane, click Synchronization Task. At the top of the page, click Create Synchronization Task and configure the task information. This example uses real-time synchronization from Kafka to MaxCompute:
-
Source Type:
Kafka. -
Destination Type:
MaxCompute. -
Specific Type:
Single-table real-time. -
Synchronization Mode:
-
Schema Migration: Automatically creates database objects such as tables, fields, and data types in the destination that match the source. This step does not include data.
-
Incremental Sync (optional): After full synchronization is complete, this step continuously captures data changes (inserts, updates, and deletes) from the source and synchronizes these changes to the destination.
If the source is Hologres, full synchronization is also supported. This process first synchronizes all existing data to the destination table. Then, incremental data synchronization starts automatically.
-
-
For more information about supported data sources and synchronization solutions, see Supported data sources and synchronization solutions.
Step 2: Configure data sources and execution resources
-
For Source Information, select your
Kafkadata source. For Destination, select yourMaxComputedata source. -
In the Running Resources section, select the Resource Group for the sync task and assign Resource Group CUs to the task. You can set CUs separately for full synchronization and incremental synchronization to precisely control resources and prevent waste. If your sync task fails with an out-of-memory (OOM) error, increase the CU value for the resource group.
-
Ensure that both the source and destination data sources pass the Connectivity Check.
Step 3: Configure the synchronization solution
1. Configure the source
-
On the Configuration tab, select the Kafka topic to synchronize.
You can use the default values for other configurations or modify them as needed. For more information about the parameters, see the official Kafka documentation .
-
In the upper-right corner, click Data Sampling.
In the dialog box that appears, set the Start time and Sampled Data Records, and then click Start Collection. This action samples data from the specified Kafka topic. You can preview the data, which provides input for the preview and visualization configurations of subsequent data processing nodes.
-
On the Configure Output Field tab, select the fields to synchronize.
By default, Kafka provides six fields.
Field Name
Description
__key__
The key of the Kafka record.
__value__
The value of the Kafka record.
__partition__
The partition number where the Kafka record is located. The partition number is an integer that starts from 0.
__headers__
The headers of the Kafka record.
__offset__
The offset of the Kafka record in its partition. The offset is an integer that starts from 0.
__timestamp__
The 13-digit UNIX timestamp in milliseconds for the Kafka record.
You can also perform more field transformations in subsequent data processing nodes.
2. Data processing
Enable the Data Processing toggle. Five data processing methods are available: data masking, string replace, data filtering, JSON parsing, and edit and assign fields. You can arrange these methods in your preferred order. At runtime, they are executed sequentially.
After you configure a data processing node, you can click Preview Data Output in the upper-right corner:
-
The table below the input data shows the results from the previous Data Sampling step. You can click Re-obtain Output of Ancestor Node to refresh the results.
-
If there is no output from the upstream node, you can also use Manually Construct Data to simulate the previous output.
-
Click Preview to view the output data from the upstream step after it is processed by the data processing component.
The Input Data table displays Kafka message fields such as key, value, partition, offset, and timestamp. The Preview Result section shows the fields parsed after data processing, such as key-value pairs from JSON parsing. It also indicates the number of dirty data records found and includes a disclaimer: "The preview shown here is for reference only. The results of the actual task execution apply."
The data output preview and data processing features depend on Data Sampling from the Kafka source. Before processing data, you must first perform data sampling for the Kafka source.
3. Configure the destination
-
In the Destination section, select a Tunnel resource group. The default selection is "Public Transport Resources", which corresponds to the free quota for MaxCompute.
-
Specify whether to write to a new table or use an existing table.
-
If you create a new table, you can select Create from the drop-down list. By default, a table with the same structure as the source is automatically created. You can manually change the destination table name and schema.
-
If you choose to use an existing table, select the destination table from the drop-down list.
-
-
(Optional) Edit the table schema.
Click the edit icon next to the table name to modify its schema. You can click Re-generate Table Schema Based on Output Column of Ancestor Node to automatically generate the schema based on the output columns of the upstream node. You can then select a column in the auto-generated schema to be the primary key.
4. Configure field mapping
After you select the source and destination, you must specify the mapping between source and destination columns. The task writes data from the source fields to the corresponding destination fields based on the configured field mapping.
-
The system automatically maps upstream columns to destination columns based on the The same name mapping principle. You can adjust the mappings as needed. A single upstream column can be mapped to multiple destination columns, but multiple upstream columns cannot be mapped to a single destination column. If an upstream column is not mapped, its data is not written to the destination table.
-
For Kafka fields, you can configure custom JSON parsing. Use the data processing component to retrieve the content of the value field for more granular field configuration.
In the field mapping section, click Map by Same Name to create a one-to-one correspondence between upstream input fields and MaxCompute output fields. The mapping includes Kafka metadata fields (
_key_,_value_,_partition_,_offset_,_timestamp_, and_headers_) and business fields (id,name, andage). LONG type fields are mapped to the BIGINT type in MaxCompute, and STRING types remain unchanged. -
(Optional) Configure partitions.
-
Automatic Time-based Partitioning creates partitions based on the business time (in this case, the _timestamp field). The first-level partition is by year, the second-level partition is by month, and so on.
-
Dynamic Partitioning by Field Content maps a field from the source table to a partition field in the destination MaxCompute table. This ensures that rows containing specific data in the source field are written to the corresponding partition in the MaxCompute table.
-
Step 4: Advanced configuration
The sync task provides advanced parameters for fine-grained configuration. The system provides default values, which you do not need to change in most cases. To modify them:
-
In the upper-right corner of the page, click Advanced Settings to go to the Advanced Parameters configuration page.
NoteIn Data Development, advanced configuration is on a tab on the right side of the task configuration page.
-
You can set parameters separately for the read and write ends of the sync task. Set Automatically set runtime configuration to false to customize the Runtime Configuration.
-
Modify the parameter values based on the tooltips. The description for each parameter is displayed next to its name. For configuration suggestions for some parameters, see Advanced parameters for real-time synchronization.
Modify these parameters only after you fully understand their purpose and potential consequences. Incorrect settings can cause unexpected errors or data quality issues.
Step 5: Test run
After you complete all task configurations, click Perform Simulated Running in the lower-left corner to debug the task. This simulates how the task processes a small amount of sample data and allows you to preview the results in the destination table. If there are configuration errors, exceptions, or dirty data, the system provides real-time feedback. This helps you quickly assess the correctness of your task configuration and verify if it produces the expected results.
-
In the dialog box that appears, set the sampling parameters: Start time and Sampled Data Records.
-
Click Start Collection to retrieve the sample data.
-
Click Preview Result to simulate the task run and view the output.
The output of the test run is for preview only. It is not written to the destination data source and does not affect production data.
Step 6: Publish and run
-
After you complete all configurations, click Save at the bottom of the page to save the task configuration.
-
You must publish Data Integration tasks to the production environment for them to run. Any new or edited task must be Deploy to take effect. During publishing, if you select Start immediately after deployment, the task starts automatically. Otherwise, after the task is published, go to the page and manually start the task from the Actions column.
-
In the Tasks, click the Name/ID of the task to view its detailed execution process.
Step 7: Configure alert rules
After the task is published and running, you can configure alert rules to receive immediate notifications about exceptions. This helps keep your production environment stable and your data fresh. In the task list of Data Integration, find the target task, and in the Actions column, click .
1. Add an alert
Select Use custom rules, and enter an Alert Name and Description. Supported notification methods include Email, SMS, Phone, DingTalk, webhook, and Feishu. You can configure these methods independently for WARNING and CRITICAL levels. Recipients can be specified through an on-call schedule or added manually.
(1) Click Create Rule to configure an alert rule.
You can set the Alert Reason to monitor metrics such as Business delay, failover, Task status, DDL Notification, and Task Resource Utilization. You can then set CRITICAL or WARNING alert levels based on specified thresholds.
-
After you set the alert method, you can use the Configure Advanced Parameters to control the interval for sending alert notifications, which prevents message flooding and waste.
-
If you select Business delay, Task status, or Task Resource Utilization as the alert cause, you can also enable recovery notifications to inform recipients when the task returns to normal.
(2) Manage alert rules.
For existing alert rules, you can use the toggle to enable or disable them. You can also send alerts to different personnel based on the alert level.
2. View alerts
In the task list, expand to go to the alert events page and view historical alerts.
More operations
After the task starts, you can click the task name to view its running details and perform task operations and maintenance (O&M) and tuning.
FAQ
For answers to frequently asked questions about real-time synchronization tasks, see Real-time synchronization FAQ.
Reference
Real-time synchronization of a single table from Kafka to ApsaraDB for OceanBase
Real-time ingestion of a single table from LogHub (SLS) to Data Lake Formation
Real-time synchronization of a single table from Hologres to Doris
Real-time synchronization of a single table from Hologres to Hologres
Real-time synchronization of a single table from Kafka to Hologres
Real-time synchronization of a single table from LogHub (SLS) to Hologres
Real-time synchronization of a single table from Kafka to Hologres
Real-time synchronization of a single table from Hologres to Kafka
Real-time synchronization of a single table from LogHub (SLS) to MaxCompute
Real-time synchronization of a single table from Kafka to an OSS data lake
Real-time synchronization of a single table from Kafka to StarRocks
Real-time synchronization of a single table from Oracle to Tablestore