All Products
Search
Document Center

DataWorks:Offline synchronization of a single table from Kafka to MaxCompute

Last Updated:Nov 27, 2025

This topic describes how to configure a recurring schedule to synchronize incremental data from Kafka to MaxCompute. The example covers synchronizing data by the minute, hour, or day to hourly or daily partitioned tables in MaxCompute.

Precautions

  • Your Kafka version must be 0.10.2 or later and 2.2.x or earlier. Kafka must have record timestamps enabled, and the records must contain correct business timestamps.

  • After incremental data synchronization starts, records with timestamps that are earlier than or equal to the start time might still be written to the Kafka topic. These records might not be read. If data writes to the Kafka topic are delayed or timestamps are out of order, be aware of the risk of data loss for the offline synchronization task.

  • For the synchronization end policy on the Kafka side, you can select No new data for 1 minute only if the following conditions are met. Otherwise, data loss may occur.

    • Some or all partitions in the Kafka topic have no new data written for a long time, such as for more than 10 minutes.

    • After each recurring instance starts, no records with timestamps earlier than the end time parameter are written to the Kafka topic.

Prerequisites

Limits

Syncing source data to MaxCompute foreign tables is not supported.

Procedure

Note

This topic uses the new DataStudio interface to demonstrate how to configure an offline synchronization task.

Step 1: Create a node and configure the task

For the general steps to create and configure a node in the codeless UI, see the Codeless UI configuration guide. This topic does not repeat those steps.

Step 2: Configure the data source and destination

Configure the data source (Kafka)

This topic describes how to perform an offline synchronization of a single table from Kafka to MaxCompute. The data source is Kafka. The key configuration points are as follows.

Note

For a general description of the configuration items for a Kafka data source, see the Kafka Reader document. The following are reference configurations for this tutorial.

Configuration item

Key configuration

Topic

Select the Kafka topic to synchronize. If you use a standard mode DataWorks workspace, a topic with the same name must exist in the Kafka clusters for both the development and production environments. For Topic, select this topic.

Note

If:

  • If the topic does not exist in the development environment, you cannot find the topic in the Topic drop-down list when you configure the offline synchronization node.

  • If the topic does not exist in the production environment, the recurring schedule for the task will fail after the task is submitted and published. This is because the task cannot find the table to synchronize.

Consumer Group ID

Enter a unique ID for the Kafka cluster based on your business needs. This helps with statistics and monitoring on the Kafka cluster side.

Read Start Offset, Start Time, Read End Offset, End Time

Set both Read Start Offset and Read End Offset to Specific Time. Set Start Time and End Time to the scheduling parameters ${startTime} and ${endTime}, respectively.

These parameters define the start and end positions for data synchronization. This configuration means that synchronization starts with data from ${startTime} and ends with data from ${endTime}. The ${startTime} and ${endTime} parameters are replaced with values based on the scheduling parameter expressions when the synchronization task runs.

Time Zone

You can leave this empty or select the default server time zone of the region where DataWorks is located.

Note

If you have contacted Alibaba Cloud technical support to change the scheduling time zone, you can select your modified time zone here.

Key Type, Value Type, Encoding

Select based on the actual records in the Kafka topic.

Synchronization Completion Policy

For the synchronization end policy, select No new data for 1 minute if the conditions are met. Otherwise, select Stop at the specified end position.

  • Some or all partitions in the Kafka topic have no new data written for a long time, such as for more than 10 minutes.

  • After each recurring instance starts, no records with timestamps earlier than the end time parameter are written to the Kafka topic.

Advanced Configuration

Keep the default settings.

Configure the data destination (MaxCompute)

This topic describes how to perform an offline synchronization of a single table from Kafka to MaxCompute. The data destination is a table. The key configuration points are as follows.

Note

For parameters not described in the following table, you can keep the default settings.

Configuration item

Key configuration

Data Source

This shows the MaxCompute data source selected in the previous step. If you use a standard mode DataWorks workspace, the names of the development and production projects are displayed.

Table

Select the MaxCompute table to synchronize to. If you use a standard mode DataWorks workspace, make sure that a MaxCompute table with the same name and schema exists in both the development and production environments.

You can also click Generate Target Table Schema. The system automatically creates a table to receive data. You can manually adjust the table creation statement.

Note

If the following conditions apply:

  • If the destination MaxCompute table does not exist in the development environment, you cannot find the table in the drop-down list when you configure the destination table for the offline synchronization node.

  • If the destination MaxCompute table does not exist in the production environment, the synchronization task will fail when it runs on a schedule after being submitted and published. This is because the task cannot find the destination table.

  • If the table schemas in the development and production environments are inconsistent, the column mapping during the scheduled run may differ from the mapping configured in the offline synchronization node. This can lead to incorrect data writes.

Partition

If the table is a partitioned table, you can enter values for the partition key columns.

  • The value can be a static field, such as ds=20220101.

  • The value can be a scheduling system parameter, such as ds=${partition}. The scheduling system parameter is automatically replaced when the task runs.

Step 3: Configure field mapping

After you select the data source and destination, you must specify the column mapping between the reader and writer. You can select Map Fields with Same Name, Map Fields in Same Line, Clear Mappings, or Manually Edit Mapping.

  • There are six default fields on the Kafka side.

    Field Name

    Description

    __key__

    The key of the Kafka record.

    __value__

    The value of the Kafka record.

    __partition__

    The partition number where the Kafka record is located. Partition numbers start from 0.

    __headers__

    The headers of the Kafka record.

    __offset__

    The offset of the Kafka record in its partition. Offsets start from 0.

    __timestamp__

    The 13-digit integer millisecond timestamp of the Kafka record.

  • You can configure custom JSON parsing for fields on the Kafka side. Use the . (to retrieve a subfield) and [] (to retrieve an array element) syntax to obtain content from the value field of a JSON-formatted Kafka record.

    Important

    If a JSON field name contains a "." character, you cannot retrieve the field value by defining the field. This is because it causes ambiguity in the field definition syntax.

    The following is an example of the data value of a JSON-formatted record in Kafka.

    {
          "a": {
          "a1": "hello"
          },
          "b": "world",
          "c":[
                "xxxxxxx",
                "yyyyyyy"
                ],
          "d":[
                {
                      "AA":"this",
                      "BB":"is_data"
                },
                {
                      "AA":"that",
                      "BB":"is_also_data"
                }
            ],
         "a.b": "unreachable"
    }
    • To synchronize the data for a1, "hello", you can add the field a.a1 on the Kafka side.

    • To synchronize the data for b, "world", you can add the field b on the Kafka side.

    • To synchronize the data for c, "yyyyyyy", you can add the field c[1] on the Kafka side.

    • To synchronize the data for AA, "this", you can add the field d[0].AA on the Kafka side.

    • If the field on the Kafka side is defined as a.b, you cannot synchronize the data "unreachable".

  • Source or destination table fields can be excluded from the mapping. The synchronization instance does not read unmapped source fields. NULL is written to unmapped destination fields.

  • You cannot map one source field to multiple destination fields. A destination field also cannot be mapped from multiple source fields.

Step 4: Configure advanced parameters

Click Advanced Configuration on the right side of the task to set parameters such as Maximum Expected Concurrency and Policy for Dirty Data Records. For this tutorial, set the Policy for Dirty Data Records to Ignore Dirty Data Records and leave the other parameters at their default values. For more information, see Codeless UI Configuration.

Step 5: Configure and run a test

  1. On the right side of the offline synchronization node editing page, click Debugging Configurations. Set the Resource Group and Script Parameters for the test run. Then, click Run in the top toolbar to test whether the synchronization link runs successfully.

  2. You can click the image icon in the navigation pane on the left and then click the image icon to the right of Personal Directory to create a file with the .sql extension. You can then execute the following SQL query to check whether the data in the destination table is as expected.

    Note
    SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;

Step 6: Configure scheduling and publish the task

Click Scheduling on the right side of the offline synchronization task. Set the scheduling configuration parameters for the recurring run. Then, click Publish in the top toolbar to open the publishing panel. Follow the on-screen instructions to publish the task.

When you configured the data source and destination, you used three scheduling parameters: ${startTime}, ${endTime}, and ${partition}. In the scheduling configuration, you must specify the replacement policy for these parameters based on your synchronization needs. The following are configuration examples for several typical scenarios.

Typical scenario

Recommended configuration

Scenario description

The synchronization task is scheduled every 5 minutes

  • startTime=$[yyyymmddhh24mi-8/24/60]00

  • endTime=$[yyyymmddhh24mi-3/24/60]00

  • partition=$[yyyymmddhh24mi-8/24/60]

If the synchronization task is scheduled to start at 10:00 on 2022-11-22:

  • It synchronizes records from the Kafka topic with timestamps from 09:52 on 2022-11-22 (inclusive) to 09:57 on 2022-11-22 (exclusive).

  • The synchronized Kafka data is written to the 202211220952 partition in MaxCompute.

  • The endTime is set to three minutes earlier than the instance scheduling time ($[yyyymmddhh24mi]). This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts, which prevents data loss.

The synchronization task is scheduled every hour

  • startTime=$[yyyymmddhh24-1/24]0000

  • endTime=$[yyyymmddhh24]0000

  • partition=$[yyyymmddhh24]

Note
  • If the synchronization task is scheduled every 2 hours, set startTime=$[yyyymmddhh24-2/24]0000. The other scheduling parameters remain unchanged.

  • If the synchronization task is scheduled every 3 hours, set startTime=$[yyyymmddhh24-3/24]0000. The other scheduling parameters remain unchanged.

  • The same logic applies to other hourly scheduling cycles.

If the synchronization task is scheduled to start at 10:05 on 2022-11-22:

  • It synchronizes records from the Kafka topic with timestamps from 09:00 on 2022-11-22 (inclusive) to 10:00 on 2022-11-22 (exclusive).

  • The synchronized Kafka data is written to the 2022112210 partition in MaxCompute.

The synchronization task is scheduled every day

  • startTime=$[yyyymmdd-1]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

If the synchronization task is scheduled to start at 00:05 on 2022-11-22:

  • It synchronizes records from the Kafka topic with timestamps from 00:00 on 2022-11-21 (inclusive) to 00:00 on 2022-11-22 (exclusive).

  • The synchronized Kafka data is written to the 20221121 partition in MaxCompute.

The synchronization task is scheduled every week

  • startTime=$[yyyymmdd-7]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

If the synchronization task is scheduled to start at 00:05 on 2022-11-22:

  • It synchronizes records from the Kafka topic with timestamps from 00:00 on 2022-11-15 (inclusive) to 00:00 on 2022-11-22 (exclusive).

  • The synchronized Kafka data is written to the 20221121 partition in MaxCompute.

The synchronization task is scheduled every month

  • startTime=$[add_months(yyyymmdd,-1)]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

If the synchronization task is scheduled to start at 00:05 on 2022-11-22:

  • It synchronizes records from the Kafka topic with timestamps from 00:00 on 2022-10-22 (inclusive) to 00:00 on 2022-11-22 (exclusive).

  • The synchronized Kafka data is written to the 20221121 partition in MaxCompute.

Set the scheduling cycle based on the desired interval.

Typical scenario

Recommended configuration

Scenario description

The synchronization task is scheduled every 5 minutes

  • Scheduling Cycle: Minute

  • Start Time: 00:00

  • Interval: 5 minutes

  • End Time: 23:59

None

The synchronization task is scheduled every hour

  • Scheduling Cycle: Hour

  • Start Time: 00:15

  • Interval: 1 hour

  • End Time: 23:59

Set the start time to be slightly later than 00:00, such as 00:15. This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts.

The synchronization task is scheduled every day

  • Scheduling Cycle: Day

  • Scheduled Time: 00:15

Set the scheduled time to be slightly later than 00:00, such as 00:15. This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts.

The synchronization task is scheduled every week

  • Scheduling Cycle: Week

  • Specified Time: Monday

  • Scheduled Time: 00:15

Set the scheduled time to be slightly later than 00:00, such as 00:15. This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts.

The synchronization task is scheduled every month

  • Scheduling Cycle: Month

  • Specified Time: 1st of each month

  • Scheduled Time: 00:15

Set the scheduled time to be slightly later than 00:00, such as 00:15. This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts.

Important

If records with timestamps that are earlier than or equal to the start time are written to the Kafka topic after the instance starts, these records might not be read. If data writes to the Kafka topic are delayed or timestamps are out of order, be aware of the risk of data loss for the offline synchronization task.