All Products
Search
Document Center

DataWorks:Batch synchronization from a single Kafka table to MaxCompute

Last Updated:Mar 26, 2026

This tutorial shows you how to set up a recurring offline synchronization task that reads time-windowed records from a Kafka topic and writes them to a partitioned MaxCompute table. The task can run on any schedule—by the minute, hour, day, week, or month.

Prerequisites

Before you begin, ensure that you have:

Limitations

  • Syncing source data to MaxCompute foreign tables is not supported.

  • Kafka version must be 0.10.2 or later, and 2.2.x or earlier.

  • Kafka must have record timestamps enabled, and each record must carry a correct business timestamp.

Data loss risk

Records with timestamps earlier than or equal to the start time may arrive in the Kafka topic after a recurring instance starts. These records may not be read. If writes to the Kafka topic are delayed or timestamps are out of order, data loss can occur for the offline synchronization task.

Configure the synchronization task

Note

This tutorial uses the new DataStudio interface.

Step 1: Create a node

Follow the Codeless UI configuration guide to create and configure an offline synchronization node. This tutorial focuses on the configuration details specific to Kafka-to-MaxCompute synchronization.

Step 2: Configure the data source and destination

Configure the data source (Kafka)

This tutorial demonstrates a single-table offline synchronization from Kafka to MaxCompute. For a complete reference of all Kafka Reader configuration options, see the Kafka Reader document.

The following table covers the key configuration items for this tutorial.

Configuration item Key configuration
Topic Select the Kafka topic to synchronize. In a standard mode DataWorks workspace, a topic with the same name must exist in the Kafka clusters for both the development and production environments.
Note

If the topic is missing from the development environment, it will not appear in the Topic drop-down list. If it is missing from the production environment, the recurring schedule will fail after the task is published, because the task cannot find the topic to synchronize.

Consumer Group ID Enter a unique ID for the Kafka cluster. This ID is used for statistics and monitoring on the Kafka side.
Read Start Offset and Start Time Set Read Start Offset to Specific Time, then set Start Time to ${startTime}. This defines where synchronization begins—records from ${startTime} onward are included.
Read End Offset and End Time Set Read End Offset to Specific Time, then set End Time to ${endTime}. Records up to (but not including) ${endTime} are synchronized. Both ${startTime} and ${endTime} are replaced with concrete timestamps at runtime based on the scheduling parameter expressions you configure.
Time Zone Leave blank to use the default server time zone for the DataWorks region. If you have changed the scheduling time zone with Alibaba Cloud support, select that time zone here.
Key Type, Value Type, Encoding Select based on the actual records in the Kafka topic.
Synchronization Completion Policy Controls when the synchronization task stops reading. Choose based on your Kafka traffic patterns—see the comparison below.
Advanced Configuration Keep the defaults.

Choosing a synchronization completion policy

The two options behave differently and suit different scenarios:

No new data for 1 minute Stop at the specified end position
How it works The task stops when no new records arrive across all partitions for 1 consecutive minute. The task stops as soon as it reaches the offset corresponding to ${endTime}.
Use when All of the following are true: (1) some or all partitions routinely go silent for long periods, such as more than 10 minutes, and (2) no records with timestamps earlier than ${endTime} will be written to the topic after each recurring instance starts. You cannot guarantee (1) or (2) above. This is the safer default.
Risk if misused Data loss if late-arriving records are still being written when the 1-minute silence occurs. None—the task reliably stops at the configured end position.

Configure the data destination (MaxCompute)

Configuration item Key configuration
Data Source Displays the MaxCompute data source selected in the previous step. In a standard mode workspace, both development and production project names are shown.
Table Select the destination MaxCompute table. In a standard mode workspace, a table with the same name and schema must exist in both environments. Alternatively, click Generate Target Table Schema to let the system create a table automatically, then adjust the CREATE TABLE statement as needed.
Note

If the table is missing from the development environment, it will not appear in the drop-down list. If it is missing from the production environment, the task will fail at runtime. If schemas differ between environments, column mapping may produce incorrect data writes.

Partition For partitioned tables, enter the partition key value. Use a static value such as ds=20220101, or a scheduling parameter such as ds=${partition} that is replaced automatically at runtime.

Step 3: Configure field mapping

After selecting the source and destination, map columns between the Kafka reader and the MaxCompute writer. Use Map Fields with Same Name, Map Fields in Same Line, Clear Mappings, or Manually Edit Mapping as needed.

Default Kafka fields

Kafka exposes six built-in fields that you can map directly to MaxCompute columns.

Field name Description
__key__ The key of the Kafka record.
__value__ The value of the Kafka record.
__partition__ The partition number where the record is located. Starts from 0.
__headers__ The headers of the Kafka record.
__offset__ The record's offset within its partition. Starts from 0.
__timestamp__ The record timestamp as a 13-digit millisecond integer.

JSON field parsing

For JSON-formatted Kafka values, add custom field definitions using . to access subfields and [] to access array elements.

Given this example record value:

{
  "a": {
    "a1": "hello"
  },
  "b": "world",
  "c": [
    "xxxxxxx",
    "yyyyyyy"
  ],
  "d": [
    { "AA": "this", "BB": "is_data" },
    { "AA": "that", "BB": "is_also_data" }
  ],
  "a.b": "unreachable"
}
Field definition Retrieved value Notes
a.a1 "hello" Subfield access with .
b "world" Top-level field
c[1] "yyyyyyy" Array element access with []
d[0].AA "this" Combined access
a.b *(not retrievable)* Field names containing . cannot be parsed—the . is interpreted as a subfield separator, causing ambiguity.

Mapping rules

  • Unmapped source fields are not read by the synchronization instance.

  • NULL is written to unmapped destination fields.

  • One source field cannot map to multiple destination fields.

  • One destination field cannot be mapped from multiple source fields.

Step 4: Configure advanced parameters

Click Advanced Configuration on the right side of the task. For this tutorial, set Policy for Dirty Data Records to Ignore Dirty Data Records and leave all other parameters at their defaults. For parameter details, see Codeless UI configuration.

Step 5: Test the synchronization

  1. Click Run Configuration on the right side of the editing page. Set the Resource Group and Script Parameters for the test run, then click Run in the top toolbar.

  2. After the test run completes, verify the data in the destination table. In the left navigation pane, create a file with the .sql extension and run the following query:

    Note
    SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;

Step 6: Configure scheduling and publish

Click Scheduling on the right side of the task to configure the scheduling settings for the recurring run, then click Publish to publish the task.

The three scheduling parameters used in this tutorial—${startTime}, ${endTime}, and ${partition}—are linked: the values injected into ${startTime} and ${endTime} control the Kafka time window for each instance, while ${partition} determines which MaxCompute partition receives the data. Configure all three together based on your scheduling cycle.

The following examples cover the most common scheduling patterns.

Scheduling parameter expressions

Scheduling cycle startTime expression endTime expression partition expression
Every 5 minutes $[yyyymmddhh24mi-8/24/60]00 $[yyyymmddhh24mi-3/24/60]00 $[yyyymmddhh24mi-8/24/60]
Every hour $[yyyymmddhh24-1/24]0000 $[yyyymmddhh24]0000 $[yyyymmddhh24]
Every 2 hours $[yyyymmddhh24-2/24]0000 $[yyyymmddhh24]0000 $[yyyymmddhh24]
Every 3 hours $[yyyymmddhh24-3/24]0000 $[yyyymmddhh24]0000 $[yyyymmddhh24]
Every day $[yyyymmdd-1]000000 $[yyyymmdd]000000 $[yyyymmdd-1]
Every week $[yyyymmdd-7]000000 $[yyyymmdd]000000 $[yyyymmdd-1]
Every month $[add_months(yyyymmdd,-1)]000000 $[yyyymmdd]000000 $[yyyymmdd-1]

How the expressions work (hourly example)

For a task scheduled to run at 10:05 on 2022-11-22:

  • startTime resolves to 20221122090000 — reads records from 09:00 (inclusive)

  • endTime resolves to 20221122100000 — stops at 10:00 (exclusive)

  • partition resolves to 2022112210 — writes to that partition in MaxCompute

5-minute example

For a task scheduled to run at 10:00 on 2022-11-22:

  • startTime resolves to 20221122095200 — reads records from 09:52 (inclusive)

  • endTime resolves to 20221122095700 — stops at 09:57 (exclusive)

  • partition resolves to 202211220952

The endTime is set 3 minutes before the instance run time. This buffer ensures all records for the time window have been written to Kafka before the instance starts reading.

Scheduling cycle settings

Scheduling cycle Cycle Start time Interval End time Day/date
Every 5 minutes Minute 00:00 5 minutes 23:59
Every hour Hour 00:15 1 hour 23:59
Every day Day 00:15
Every week Week 00:15 Monday
Every month Month 00:15 1st of each month

For hourly, daily, weekly, and monthly schedules, set the start time slightly after midnight (for example, 00:15 rather than 00:00). This gives Kafka time to finish writing records before the synchronization instance starts, reducing the risk of data loss.

Important

If records with timestamps earlier than or equal to ${startTime} are written to the Kafka topic after a recurring instance has already started, those records may not be read. Delayed writes or out-of-order timestamps increase the risk of data loss.

What's next

  • Review the Kafka Reader reference for the full list of configuration options.

  • See Scheduling configuration for advanced scheduling options such as dependencies and rerun policies.