This topic describes how to configure a batch synchronization node that can be scheduled by minute, hour, or day to synchronize incremental data from Kafka to the hourly or daily partitions in a MaxCompute table.
Precautions
- The version of the Kafka data source that you use must range from 0.10.2 to 2.2.x, and timestamps must be used to record the time when messages are transferred to Kafka.
- If messages whose timestamps are earlier than or equal to the start offset of your batch synchronization node are transferred to the specified Kafka topic after the node starts to synchronize incremental data from Kafka, the messages may be missed. In this case, you must pay attention to data missing risks caused by latency or timestamp disorder of messages transferred to the Kafka topic.
- You can set the Exit Strategy parameter to Exit when poll nothing in 1 minute for your batch synchronization node in the codeless user interface (UI) only if the following conditions are met. If you configure this setting but the conditions are not met, some incremental data in Kafka may be missed when the node is run.
- No data is written to all or some partitions in the specified Kafka topic for 10 or more minutes.
- After each auto triggered instance generated for the batch synchronization node is started, no messages whose timestamps are earlier than the value of the End Time parameter are written to the specified Kafka topic.
Make preparations
- Create a workspace in which you want to run a batch synchronization node and an exclusive resource group for Data Integration that can be used to synchronize data. For information about how to create an exclusive resource group for Data Integration, see Create and use an exclusive resource group for Data Integration. In this example, a workspace in standard mode and an exclusive resource group for Data Integration are created, and the resource group is associated with the workspace.
- Add a Kafka data source to the workspace and test the network connectivity between the Kafka data source and the exclusive resource group for Data Integration. For more information, see Add a Kafka data source. Important When you configure a batch synchronization node to synchronize incremental data from Kafka in a workspace in standard mode, you must make sure that the Kafka data source used in the production environment of the workspace contains a topic whose name is the same as a topic in the Kafka data source used in the development environment of the workspace. This way, when the batch synchronization node is run, the node can synchronize data from the topic in the Kafka data source used in the production environment.
- Prepare a MaxCompute data source. You can associate a MaxCompute compute engine with the workspace to enable the system to generate a default MaxCompute data source. You can also directly add a MaxCompute data source to the workspace. For more information, see Associate a MaxCompute compute engine with a workspace or Add a MaxCompute data source.
Create a batch synchronization node
You can find the required workflow in the Scheduled Workflow pane of the DataStudio page in the DataWorks console and create a batch synchronization node in the workflow. When you create a batch synchronization node, you must configure the parameters such as Path and Name. For more information, see Configure a batch synchronization node by using the codeless UI.
Configure the source
Parameter | Description |
---|---|
Connection and Topic | The name of the Kafka topic from which you want to read data. If you use a workspace in standard mode, you must make sure that the Kafka data source used in the production environment of the workspace contains a topic whose name is the same as a topic in the Kafka data source used in the development environment of the workspace. You can select the topic from the Topic drop-down list. Note Before you configure the Topic parameter, take note of the following items:
|
Consumer Group ID | The consumer group ID. The ID can be used to collect statistics on and monitor data consumption in the Kafka data source. You can configure this parameter based on your business requirements. If you configure this parameter, make sure that the ID is unique in the Kafka data source. |
Kafka Version | The version range of the Kafka data source. Note The version of the Kafka data source must range from 0.10.2 to 2.2.x. |
Read From, Start Time, Read To, and End Time | You can set Read From and Read To to Specific Offset, Start Time to ${startTime} , and End Time to ${endTime} . ${startTime} and ${endTime} are scheduling parameters. These parameters specify the start offset and end offset of data synchronization. In this example, the batch synchronization node synchronizes data within the time range specified by the |
Time Zone | You can leave this parameter empty or retain the default value. The default value of this parameter is the time zone of the server in the region where the DataWorks workspace resides. Note If you have asked the technical personnel to change the time zone in which you want to schedule nodes, you can select the new time zone. |
Key Type, Value Type, and Encoding | You can configure these parameters based on your business requirements. |
Exit Strategy | You can set this parameter to Exit when poll nothing in 1 minute only if the following conditions are met. If the following conditions are not met, set this parameter to Exit when reach configured end offset or time.
|
Advanced Settings | You can retain the default value. |
Configure the destination
Parameter | Description |
---|---|
Connection | The name of the MaxCompute data source that you prepared. |
Table | The name of the table to which you want to write data. If you use a workspace in standard mode, you must make sure that the MaxCompute data source used in the production environment of the workspace contains a table whose name and schema are the same as those of a table in the MaxCompute data source used in the development environment of the workspace. Note Before you configure the Table parameter, take note of the following items:
If the MaxCompute table to which you want to write data is not created, you can click Generate Destination Table Schema to quickly generate a table creation statement. For more information, see Appendix: Generate the schema of a destination table. |
Partition Key Column | In this example, the destination MaxCompute table contains one partition key column ds, and the ${partition} scheduling parameter is entered in the Partition Key Column field. This way, each time the batch synchronization node is run, data is written to the partition specified by the ${partition} scheduling parameter. The system replaces the ${partition} scheduling parameter with the actual value based on the scheduling configurations of the node. Note Whether the Partition Key Column parameter needs to be configured and the number of times the system displays the Partition Key Column parameter vary based on the type of the destination MaxCompute table that you select. If you select a non-partitioned destination MaxCompute table, you do not need to manually configure this parameter. If you select a partitioned destination MaxCompute table, the system displays the Partition Key Column parameter based on the number of partition key columns in the table and the names of the partition key columns. The number of times that the system displays the Partition Key Column parameter is the same as the number of partition key columns in the table. |
Configure field mappings
- In the Mappings step, you can edit fields in the Kafka data source.
- The following table describes the default fields in Kafka.
Field name Description __key__ The key of a Kafka message. __value__ The value of a Kafka message. __partition__ The ID of the partition to which a Kafka message belongs. The ID is an integer that starts from 0. __headers__ The headers of a Kafka message. __offset__ The sequence number that is assigned to a Kafka message when the message is transferred to a partition. The sequence number is an integer that starts from 0. __timestamp__ The timestamp of a Kafka message. The timestamp is a 13-digit integer, in milliseconds. - You can specify a method for parsing JSON-formatted content in Kafka messages based on your business requirements. You can use the
.Sub-field
or[Element in an array]
syntax to obtain data in the JSON-formatted values of a message.The following code provides an example of data in the JSON-formatted values of a message.Important If the name of a field in the JSON-formatted values of a Kafka message contains periods (.), the value of the field cannot be obtained or synchronized because of a field definition syntax error.{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ], "a.b": "unreachable" }
- If you want to synchronize
"hello"
in a1, add thea.a1
field to the Kafka data source. - If you want to synchronize
"world"
in b, add theb
field to the Kafka data source. - If you want to synchronize
"yyyyyyy"
in c, add thec[1]
field to the Kafka data source. - If you want to synchronize
"this"
in AA, add thed[0].AA
field to the Kafka data source. - If you add the
a.b
to the Kafka data source,"unreachable"
cannot be synchronized.
- If you want to synchronize
- The following table describes the default fields in Kafka.
- In the Mappings step, you can configure mappings between fields in the Kafka data source and fields in the MaxCompute data source.
- DataWorks allows the existence of fields that have no mapped fields. If some fields in the source have no mapped fields in the destination, the batch synchronization node does not synchronize the fields from the source. If some fields in the destination have no mapped fields in the source, NULL is written to the fields in the destination as values.
- Each field in the source can map to only one field in the destination, and each field in the destination can have only one mapped field in the source.
Configure scheduling settings
This section describes the scheduling settings that can be configured for a batch synchronization node. For information about common scheduling settings and all scheduling settings that can be configured for a batch synchronization node, see the topics in the Schedule directory.
- Configure scheduling parameters. In the preceding configurations of the batch synchronization node, the following scheduling parameters are used:
${startTime}
,${endTime}
, and${partition}
. You must specify value formats for the scheduling parameters based on your business requirements when you configure scheduling settings for the node. The following table provides configuration examples for some typical business scenarios.Scenario Recommended configuration Sample scenario description You want to schedule the batch synchronization node every 5 minutes. - startTime=$[yyyymmddhh24mi-8/24/60]00
- endTime=$[yyyymmddhh24mi-3/24/60]00
- partition=$[yyyymmddhh24mi-8/24/60]
If the batch synchronization node is scheduled at 10:00 on November 22, 2022, the following situations occur: - Data in the time range from 09:52 on November 22, 2022 to 09:57 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
- The data synchronized from the specified Kafka topic is written to the 202211220952 partition in the MaxCompute table.
- The time specified by
endTime
is 3 minutes earlier than the scheduling time of the batch synchronization node. The scheduling time of the batch synchronization node is specified by $[yyyymmddhh24mi]. This configuration helps ensure that data in the specified time range is written to the specified Kafka topic before the node starts to read the data and prevents data missing.
You want to schedule the batch synchronization node every hour. - startTime=$[yyyymmddhh24-1/24]0000
- endTime=$[yyyymmddhh24]0000
- partition=$[yyyymmddhh24]
Note- If you want to schedule the batch synchronization node every 2 hours, set startTime to $[yyyymmddhh24-2/24]0000 and retain the settings of the endTime and partition parameters.
- If you want to schedule the batch synchronization node every 3 hours, set startTime to $[yyyymmddhh24-3/24]00000 and retain the settings of the endTime and partition parameters.
- You can refer to the preceding two configuration examples to configure scheduling parameters for a batch synchronization node that is scheduled by hour.
If the batch synchronization node is scheduled at 10:05 on November 22, 2022, the following situations occur: - Data in the time range from 09:00 on November 22, 2022 to 10:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
- The data synchronized from the specified Kafka topic is written to the 2022112210 partition in the MaxCompute table.
You want to schedule the batch synchronization node every day. - startTime=$[yyyymmdd-1]000000
- endTime=$[yyyymmdd]000000
- partition=$[yyyymmdd-1]
If the batch synchronization node is scheduled at 00:05 on November 22, 2022, the following situations occur: - Data in the time range from 00:00 on November 21, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
- The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
You want to schedule the batch synchronization node every week. - startTime=$[yyyymmdd-7]000000
- endTime=$[yyyymmdd]000000
- partition=$[yyyymmdd-1]
If the batch synchronization node is scheduled at 00:05 on November 22, 2022, the following situations occur: - Data in the time range from 00:00 on November 15, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
- The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
You want to schedule the batch synchronization node every month. - startTime=$[add_months(yyyymmdd,-1)]000000
- endTime=$[yyyymmdd]000000
- partition=$[yyyymmdd-1]
If the batch synchronization node is scheduled at 00:05 on November 22, 2022, the following situations occur: - Data in the time range from 00:00 on October 22, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
- The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
- Configure a scheduling cycle.
- You can configure a scheduling cycle for the batch synchronization node based on the frequency at which you want to schedule the node.
Scenario Recommended configuration Sample scenario description You want to schedule the batch synchronization node every 5 minutes. - Scheduling Cycle: Set this parameter to Minute.
- Start From: Set this parameter to 00:00.
- Interval: Set this parameter to 05. Unit: minutes.
- End At: Set this parameter to 23:59.
N/A. You want to schedule the batch synchronization node every hour. - Scheduling Cycle: Set this parameter to Hour.
- Start From: Set this parameter to 00:15.
- Interval: Set this parameter to 1. Unit: hours.
- End At: Set this parameter to 23:59.
You can specify a point in time that is later than 00:00, such as 00:15, for the Start From parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization node starts to read data. You want to schedule the batch synchronization node every day. - Scheduling Cycle: Set this parameter to Day.
- Run At: Set this parameter to 00:15.
You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization node starts to read data. You want to schedule the batch synchronization node every week. - Scheduling Cycle: Set this parameter to Week.
- Run Every: Set this parameter to Monday.
- Run At: Set this parameter to 00:15.
You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization node starts to read data. You want to schedule the batch synchronization node every month. - Scheduling Cycle: Set this parameter to Month.
- Run Every: Set this parameter to Day 1.
- Run At: Set this parameter to 00:15.
You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization node starts to read data. Important If messages whose timestamps are earlier than or equal to the start offset of your batch synchronization node are written to the specified Kafka topic after the node starts to synchronize data from Kafka, the data may be missed. In this case, you must pay attention to the data missing risks caused by latency or timestamp disorder of data transferred to the Kafka topic. - Configure the rerun attribute. We recommend that you select the Auto Rerun upon Error check box and set the Number of re-runs parameter to 3 and the Rerun interval parameter to 2. This way, if an error occurs on the batch synchronization node, the node can automatically rerun.
- You can configure a scheduling cycle for the batch synchronization node based on the frequency at which you want to schedule the node.
- Configure a resource group for scheduling.
You need to select a resource group for scheduling. We recommend that you use an exclusive resource group for scheduling.
- Configure scheduling dependencies.
The batch synchronization node configured in this example does not depend on other nodes. You need to only click Add Root Node in the Dependencies section of the Properties tab to configure scheduling dependencies for the node.
Configure a resource group for Data Integration
You can select the resource group for Data Integration that is connected to the Kafka and MaxCompute data sources.
Test the node
- Click the icon in the top toolbar of the configuration tab of the batch synchronization node. In the dialog box that appears, assign values to the
${startTime}
,${endTime}
, and${partition}
scheduling parameters and select a resource group for scheduling. - Check the running results of the batch synchronization node.
- Create an ad hoc query node and execute the following statements to check whether all data that is read from Kafka is correctly written to MaxCompute:
select * from test_project.test_table where ds=2022112200 limit 10; select count(*) from test_project.test_table where ds=2022112200;
Commit and deploy the batch synchronization node
If the test on the batch synchronization node is successful, you can save the configurations of the node and commit and deploy the node to Operation Center. The batch synchronization node periodically synchronizes data from Kafka to MaxCompute. For information about how to deploy a node, see Deploy nodes.
Appendix: Generate the schema of a destination table
- The destination MaxCompute table name defined in the statement is the same as the name of the Kafka topic that you specified.
- The destination MaxCompute table contains six fields, and the fields map to the following Kafka fields.
Field name Description __key__ The key of a Kafka message. __value__ The value of a Kafka message. __partition__ The ID of the partition to which a Kafka message belongs. The ID is an integer that starts from 0. __headers__ The headers of a Kafka message. __offset__ The sequence number that is assigned to a Kafka message when the message is transferred to a partition. The sequence number is an integer that starts from 0. __timestamp__ The timestamp of a Kafka message. The timestamp is a 13-digit integer, in milliseconds. - By default, the lifecycle of a MaxCompute table is 100 years.