This topic describes how to configure a recurring schedule to synchronize incremental data from Kafka to MaxCompute. The example covers synchronizing data by the minute, hour, or day to hourly or daily partitioned tables in MaxCompute.
Precautions
Your Kafka version must be 0.10.2 or later and 2.2.x or earlier. Kafka must have record timestamps enabled, and the records must contain correct business timestamps.
After incremental data synchronization starts, records with timestamps that are earlier than or equal to the start time might still be written to the Kafka topic. These records might not be read. If data writes to the Kafka topic are delayed or timestamps are out of order, be aware of the risk of data loss for the offline synchronization task.
For the synchronization end policy on the Kafka side, you can select No new data for 1 minute only if the following conditions are met. Otherwise, data loss may occur.
Some or all partitions in the Kafka topic have no new data written for a long time, such as for more than 10 minutes.
After each recurring instance starts, no records with timestamps earlier than the end time parameter are written to the Kafka topic.
Prerequisites
Purchase a Serverless resource group.
Create Kafka and MaxCompute data sources. For more information, see Data Source Configuration.
Establish a network connection between the resource group and the data sources. For more information, see Overview of network connection solutions.
Limits
Syncing source data to MaxCompute foreign tables is not supported.
Procedure
This topic uses the new DataStudio interface to demonstrate how to configure an offline synchronization task.
Step 1: Create a node and configure the task
For the general steps to create and configure a node in the codeless UI, see the Codeless UI configuration guide. This topic does not repeat those steps.
Step 2: Configure the data source and destination
Configure the data source (Kafka)
This topic describes how to perform an offline synchronization of a single table from Kafka to MaxCompute. The data source is Kafka. The key configuration points are as follows.
For a general description of the configuration items for a Kafka data source, see the Kafka Reader document. The following are reference configurations for this tutorial.
Configuration item | Key configuration |
Topic | Select the Kafka topic to synchronize. If you use a standard mode DataWorks workspace, a topic with the same name must exist in the Kafka clusters for both the development and production environments. For Topic, select this topic. Note If:
|
Consumer Group ID | Enter a unique ID for the Kafka cluster based on your business needs. This helps with statistics and monitoring on the Kafka cluster side. |
Read Start Offset, Start Time, Read End Offset, End Time | Set both Read Start Offset and Read End Offset to Specific Time. Set Start Time and End Time to the scheduling parameters These parameters define the start and end positions for data synchronization. This configuration means that synchronization starts with data from |
Time Zone | You can leave this empty or select the default server time zone of the region where DataWorks is located. Note If you have contacted Alibaba Cloud technical support to change the scheduling time zone, you can select your modified time zone here. |
Key Type, Value Type, Encoding | Select based on the actual records in the Kafka topic. |
Synchronization Completion Policy | For the synchronization end policy, select No new data for 1 minute if the conditions are met. Otherwise, select Stop at the specified end position.
|
Advanced Configuration | Keep the default settings. |
Configure the data destination (MaxCompute)
This topic describes how to perform an offline synchronization of a single table from Kafka to MaxCompute. The data destination is a table. The key configuration points are as follows.
For parameters not described in the following table, you can keep the default settings.
Configuration item | Key configuration |
Data Source | This shows the MaxCompute data source selected in the previous step. If you use a standard mode DataWorks workspace, the names of the development and production projects are displayed. |
Table | Select the MaxCompute table to synchronize to. If you use a standard mode DataWorks workspace, make sure that a MaxCompute table with the same name and schema exists in both the development and production environments. You can also click Generate Target Table Schema. The system automatically creates a table to receive data. You can manually adjust the table creation statement. Note If the following conditions apply:
|
Partition | If the table is a partitioned table, you can enter values for the partition key columns.
|
Step 3: Configure field mapping
After you select the data source and destination, you must specify the column mapping between the reader and writer. You can select Map Fields with Same Name, Map Fields in Same Line, Clear Mappings, or Manually Edit Mapping.
There are six default fields on the Kafka side.
Field Name
Description
__key__
The key of the Kafka record.
__value__
The value of the Kafka record.
__partition__
The partition number where the Kafka record is located. Partition numbers start from 0.
__headers__
The headers of the Kafka record.
__offset__
The offset of the Kafka record in its partition. Offsets start from 0.
__timestamp__
The 13-digit integer millisecond timestamp of the Kafka record.
You can configure custom JSON parsing for fields on the Kafka side. Use the
. (to retrieve a subfield)and[] (to retrieve an array element)syntax to obtain content from the value field of a JSON-formatted Kafka record.ImportantIf a JSON field name contains a "." character, you cannot retrieve the field value by defining the field. This is because it causes ambiguity in the field definition syntax.
The following is an example of the data value of a JSON-formatted record in Kafka.
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ], "a.b": "unreachable" }To synchronize the data for a1,
"hello", you can add the fielda.a1on the Kafka side.To synchronize the data for b,
"world", you can add the fieldbon the Kafka side.To synchronize the data for c,
"yyyyyyy", you can add the fieldc[1]on the Kafka side.To synchronize the data for AA,
"this", you can add the fieldd[0].AAon the Kafka side.If the field on the Kafka side is defined as
a.b, you cannot synchronize the data"unreachable".
Source or destination table fields can be excluded from the mapping. The synchronization instance does not read unmapped source fields. NULL is written to unmapped destination fields.
You cannot map one source field to multiple destination fields. A destination field also cannot be mapped from multiple source fields.
Step 4: Configure advanced parameters
Click Advanced Configuration on the right side of the task to set parameters such as Maximum Expected Concurrency and Policy for Dirty Data Records. For this tutorial, set the Policy for Dirty Data Records to Ignore Dirty Data Records and leave the other parameters at their default values. For more information, see Codeless UI Configuration.
Step 5: Configure and run a test
On the right side of the offline synchronization node editing page, click Debugging Configurations. Set the Resource Group and Script Parameters for the test run. Then, click Run in the top toolbar to test whether the synchronization link runs successfully.
You can click the
icon in the navigation pane on the left and then click the
icon to the right of Personal Directory to create a file with the .sqlextension. You can then execute the following SQL query to check whether the data in the destination table is as expected.NoteTo query data in this way, you must attach the destination MaxCompute project to DataWorks as a computing resource.
On the
.sqlfile editing page, click Debugging Configurations on the right. Specify the data source Type, Computing Resource, and Resource Group. Then, click Run in the top toolbar.
SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;
Step 6: Configure scheduling and publish the task
Click Scheduling on the right side of the offline synchronization task. Set the scheduling configuration parameters for the recurring run. Then, click Publish in the top toolbar to open the publishing panel. Follow the on-screen instructions to publish the task.
When you configured the data source and destination, you used three scheduling parameters: ${startTime}, ${endTime}, and ${partition}. In the scheduling configuration, you must specify the replacement policy for these parameters based on your synchronization needs. The following are configuration examples for several typical scenarios.
Typical scenario | Recommended configuration | Scenario description |
The synchronization task is scheduled every 5 minutes |
| If the synchronization task is scheduled to start at 10:00 on 2022-11-22:
|
The synchronization task is scheduled every hour |
Note
| If the synchronization task is scheduled to start at 10:05 on 2022-11-22:
|
The synchronization task is scheduled every day |
| If the synchronization task is scheduled to start at 00:05 on 2022-11-22:
|
The synchronization task is scheduled every week |
| If the synchronization task is scheduled to start at 00:05 on 2022-11-22:
|
The synchronization task is scheduled every month |
| If the synchronization task is scheduled to start at 00:05 on 2022-11-22:
|
Set the scheduling cycle based on the desired interval.
Typical scenario | Recommended configuration | Scenario description |
The synchronization task is scheduled every 5 minutes |
| None |
The synchronization task is scheduled every hour |
| Set the start time to be slightly later than 00:00, such as 00:15. This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts. |
The synchronization task is scheduled every day |
| Set the scheduled time to be slightly later than 00:00, such as 00:15. This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts. |
The synchronization task is scheduled every week |
| Set the scheduled time to be slightly later than 00:00, such as 00:15. This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts. |
The synchronization task is scheduled every month |
| Set the scheduled time to be slightly later than 00:00, such as 00:15. This practice ensures that all data for the time range has been written to the Kafka topic before the synchronization task instance starts. |
If records with timestamps that are earlier than or equal to the start time are written to the Kafka topic after the instance starts, these records might not be read. If data writes to the Kafka topic are delayed or timestamps are out of order, be aware of the risk of data loss for the offline synchronization task.