This tutorial shows you how to set up a recurring offline synchronization task that reads time-windowed records from a Kafka topic and writes them to a partitioned MaxCompute table. The task can run on any schedule—by the minute, hour, day, week, or month.
Prerequisites
Before you begin, ensure that you have:
-
Kafka and MaxCompute data sources configured. See Data source configuration
-
Network connectivity between the resource group and both data sources. See Overview of network connection solutions
Limitations
-
Syncing source data to MaxCompute foreign tables is not supported.
-
Kafka version must be 0.10.2 or later, and 2.2.x or earlier.
-
Kafka must have record timestamps enabled, and each record must carry a correct business timestamp.
Data loss risk
Records with timestamps earlier than or equal to the start time may arrive in the Kafka topic after a recurring instance starts. These records may not be read. If writes to the Kafka topic are delayed or timestamps are out of order, data loss can occur for the offline synchronization task.
Configure the synchronization task
This tutorial uses the new DataStudio interface.
Step 1: Create a node
Follow the Codeless UI configuration guide to create and configure an offline synchronization node. This tutorial focuses on the configuration details specific to Kafka-to-MaxCompute synchronization.
Step 2: Configure the data source and destination
Configure the data source (Kafka)
This tutorial demonstrates a single-table offline synchronization from Kafka to MaxCompute. For a complete reference of all Kafka Reader configuration options, see the Kafka Reader document.
The following table covers the key configuration items for this tutorial.
| Configuration item | Key configuration |
|---|---|
| Topic | Select the Kafka topic to synchronize. In a standard mode DataWorks workspace, a topic with the same name must exist in the Kafka clusters for both the development and production environments. Note
If the topic is missing from the development environment, it will not appear in the Topic drop-down list. If it is missing from the production environment, the recurring schedule will fail after the task is published, because the task cannot find the topic to synchronize. |
| Consumer Group ID | Enter a unique ID for the Kafka cluster. This ID is used for statistics and monitoring on the Kafka side. |
| Read Start Offset and Start Time | Set Read Start Offset to Specific Time, then set Start Time to ${startTime}. This defines where synchronization begins—records from ${startTime} onward are included. |
| Read End Offset and End Time | Set Read End Offset to Specific Time, then set End Time to ${endTime}. Records up to (but not including) ${endTime} are synchronized. Both ${startTime} and ${endTime} are replaced with concrete timestamps at runtime based on the scheduling parameter expressions you configure. |
| Time Zone | Leave blank to use the default server time zone for the DataWorks region. If you have changed the scheduling time zone with Alibaba Cloud support, select that time zone here. |
| Key Type, Value Type, Encoding | Select based on the actual records in the Kafka topic. |
| Synchronization Completion Policy | Controls when the synchronization task stops reading. Choose based on your Kafka traffic patterns—see the comparison below. |
| Advanced Configuration | Keep the defaults. |
Choosing a synchronization completion policy
The two options behave differently and suit different scenarios:
| No new data for 1 minute | Stop at the specified end position | |
|---|---|---|
| How it works | The task stops when no new records arrive across all partitions for 1 consecutive minute. | The task stops as soon as it reaches the offset corresponding to ${endTime}. |
| Use when | All of the following are true: (1) some or all partitions routinely go silent for long periods, such as more than 10 minutes, and (2) no records with timestamps earlier than ${endTime} will be written to the topic after each recurring instance starts. |
You cannot guarantee (1) or (2) above. This is the safer default. |
| Risk if misused | Data loss if late-arriving records are still being written when the 1-minute silence occurs. | None—the task reliably stops at the configured end position. |
Configure the data destination (MaxCompute)
| Configuration item | Key configuration |
|---|---|
| Data Source | Displays the MaxCompute data source selected in the previous step. In a standard mode workspace, both development and production project names are shown. |
| Table | Select the destination MaxCompute table. In a standard mode workspace, a table with the same name and schema must exist in both environments. Alternatively, click Generate Target Table Schema to let the system create a table automatically, then adjust the CREATE TABLE statement as needed. Note
If the table is missing from the development environment, it will not appear in the drop-down list. If it is missing from the production environment, the task will fail at runtime. If schemas differ between environments, column mapping may produce incorrect data writes. |
| Partition | For partitioned tables, enter the partition key value. Use a static value such as ds=20220101, or a scheduling parameter such as ds=${partition} that is replaced automatically at runtime. |
Step 3: Configure field mapping
After selecting the source and destination, map columns between the Kafka reader and the MaxCompute writer. Use Map Fields with Same Name, Map Fields in Same Line, Clear Mappings, or Manually Edit Mapping as needed.
Default Kafka fields
Kafka exposes six built-in fields that you can map directly to MaxCompute columns.
| Field name | Description |
|---|---|
__key__ |
The key of the Kafka record. |
__value__ |
The value of the Kafka record. |
__partition__ |
The partition number where the record is located. Starts from 0. |
__headers__ |
The headers of the Kafka record. |
__offset__ |
The record's offset within its partition. Starts from 0. |
__timestamp__ |
The record timestamp as a 13-digit millisecond integer. |
JSON field parsing
For JSON-formatted Kafka values, add custom field definitions using . to access subfields and [] to access array elements.
Given this example record value:
{
"a": {
"a1": "hello"
},
"b": "world",
"c": [
"xxxxxxx",
"yyyyyyy"
],
"d": [
{ "AA": "this", "BB": "is_data" },
{ "AA": "that", "BB": "is_also_data" }
],
"a.b": "unreachable"
}
| Field definition | Retrieved value | Notes |
|---|---|---|
a.a1 |
"hello" |
Subfield access with . |
b |
"world" |
Top-level field |
c[1] |
"yyyyyyy" |
Array element access with [] |
d[0].AA |
"this" |
Combined access |
a.b |
*(not retrievable)* | Field names containing . cannot be parsed—the . is interpreted as a subfield separator, causing ambiguity. |
Mapping rules
-
Unmapped source fields are not read by the synchronization instance.
-
NULL is written to unmapped destination fields.
-
One source field cannot map to multiple destination fields.
-
One destination field cannot be mapped from multiple source fields.
Step 4: Configure advanced parameters
Click Advanced Configuration on the right side of the task. For this tutorial, set Policy for Dirty Data Records to Ignore Dirty Data Records and leave all other parameters at their defaults. For parameter details, see Codeless UI configuration.
Step 5: Test the synchronization
-
Click Run Configuration on the right side of the editing page. Set the Resource Group and Script Parameters for the test run, then click Run in the top toolbar.
-
After the test run completes, verify the data in the destination table. In the left navigation pane, create a file with the
.sqlextension and run the following query:Note-
To query data this way, attach the destination MaxCompute project to DataWorks as a computing resource.
-
On the
.sqlfile editing page, click Run Configuration on the right, specify the Computing Resource, and Resource Group, then click Run.
SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20; -
Step 6: Configure scheduling and publish
Click Scheduling on the right side of the task to configure the scheduling settings for the recurring run, then click Publish to publish the task.
The three scheduling parameters used in this tutorial—${startTime}, ${endTime}, and ${partition}—are linked: the values injected into ${startTime} and ${endTime} control the Kafka time window for each instance, while ${partition} determines which MaxCompute partition receives the data. Configure all three together based on your scheduling cycle.
The following examples cover the most common scheduling patterns.
Scheduling parameter expressions
| Scheduling cycle | startTime expression |
endTime expression |
partition expression |
|---|---|---|---|
| Every 5 minutes | $[yyyymmddhh24mi-8/24/60]00 |
$[yyyymmddhh24mi-3/24/60]00 |
$[yyyymmddhh24mi-8/24/60] |
| Every hour | $[yyyymmddhh24-1/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| Every 2 hours | $[yyyymmddhh24-2/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| Every 3 hours | $[yyyymmddhh24-3/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| Every day | $[yyyymmdd-1]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
| Every week | $[yyyymmdd-7]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
| Every month | $[add_months(yyyymmdd,-1)]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
How the expressions work (hourly example)
For a task scheduled to run at 10:05 on 2022-11-22:
-
startTimeresolves to20221122090000— reads records from 09:00 (inclusive) -
endTimeresolves to20221122100000— stops at 10:00 (exclusive) -
partitionresolves to2022112210— writes to that partition in MaxCompute
5-minute example
For a task scheduled to run at 10:00 on 2022-11-22:
-
startTimeresolves to20221122095200— reads records from 09:52 (inclusive) -
endTimeresolves to20221122095700— stops at 09:57 (exclusive) -
partitionresolves to202211220952
The endTime is set 3 minutes before the instance run time. This buffer ensures all records for the time window have been written to Kafka before the instance starts reading.
Scheduling cycle settings
| Scheduling cycle | Cycle | Start time | Interval | End time | Day/date |
|---|---|---|---|---|---|
| Every 5 minutes | Minute | 00:00 | 5 minutes | 23:59 | — |
| Every hour | Hour | 00:15 | 1 hour | 23:59 | — |
| Every day | Day | 00:15 | — | — | — |
| Every week | Week | 00:15 | — | — | Monday |
| Every month | Month | 00:15 | — | — | 1st of each month |
For hourly, daily, weekly, and monthly schedules, set the start time slightly after midnight (for example, 00:15 rather than 00:00). This gives Kafka time to finish writing records before the synchronization instance starts, reducing the risk of data loss.
If records with timestamps earlier than or equal to ${startTime} are written to the Kafka topic after a recurring instance has already started, those records may not be read. Delayed writes or out-of-order timestamps increase the risk of data loss.
What's next
-
Review the Kafka Reader reference for the full list of configuration options.
-
See Scheduling configuration for advanced scheduling options such as dependencies and rerun policies.