This topic describes how to configure a batch synchronization node that can be scheduled by minute, hour, or day to synchronize incremental data from Kafka to the hourly or daily partitions in a MaxCompute table.

Precautions

  • The version of the Kafka data source that you use must range from 0.10.2 to 2.2.x, and timestamps must be used to record the time when messages are transferred to Kafka.
  • If messages whose timestamps are earlier than or equal to the start offset of your batch synchronization node are transferred to the specified Kafka topic after the node starts to synchronize incremental data from Kafka, the messages may be missed. In this case, you must pay attention to data missing risks caused by latency or timestamp disorder of messages transferred to the Kafka topic.
  • You can set the Exit Strategy parameter to Exit when poll nothing in 1 minute for your batch synchronization node in the codeless user interface (UI) only if the following conditions are met. If you configure this setting but the conditions are not met, some incremental data in Kafka may be missed when the node is run.
    • No data is written to all or some partitions in the specified Kafka topic for 10 or more minutes.
    • After each auto triggered instance generated for the batch synchronization node is started, no messages whose timestamps are earlier than the value of the End Time parameter are written to the specified Kafka topic.

Make preparations

  • Create a workspace in which you want to run a batch synchronization node and an exclusive resource group for Data Integration that can be used to synchronize data. For information about how to create an exclusive resource group for Data Integration, see Create and use an exclusive resource group for Data Integration. In this example, a workspace in standard mode and an exclusive resource group for Data Integration are created, and the resource group is associated with the workspace.
  • Add a Kafka data source to the workspace and test the network connectivity between the Kafka data source and the exclusive resource group for Data Integration. For more information, see Add a Kafka data source.
    Important When you configure a batch synchronization node to synchronize incremental data from Kafka in a workspace in standard mode, you must make sure that the Kafka data source used in the production environment of the workspace contains a topic whose name is the same as a topic in the Kafka data source used in the development environment of the workspace. This way, when the batch synchronization node is run, the node can synchronize data from the topic in the Kafka data source used in the production environment.
  • Prepare a MaxCompute data source. You can associate a MaxCompute compute engine with the workspace to enable the system to generate a default MaxCompute data source. You can also directly add a MaxCompute data source to the workspace. For more information, see Associate a MaxCompute compute engine with a workspace or Add a MaxCompute data source.

Create a batch synchronization node

You can find the required workflow in the Scheduled Workflow pane of the DataStudio page in the DataWorks console and create a batch synchronization node in the workflow. When you create a batch synchronization node, you must configure the parameters such as Path and Name. For more information, see Configure a batch synchronization node by using the codeless UI.

Configure the source

In the Connections step, you can configure parameters related to the source. In this example, a batch synchronization node is created to synchronize incremental data from Kafka to MaxCompute. The following figure shows the parameters that can be configured for the source. Configure the source
Note For more information about the parameters that can be configured for a batch synchronization node used to synchronize data from Kafka, see Kafka Reader. The following table describes the parameters that need to be configured in this example.
ParameterDescription
Connection and TopicThe name of the Kafka topic from which you want to read data. If you use a workspace in standard mode, you must make sure that the Kafka data source used in the production environment of the workspace contains a topic whose name is the same as a topic in the Kafka data source used in the development environment of the workspace. You can select the topic from the Topic drop-down list.
Note Before you configure the Topic parameter, take note of the following items:
  • If the Kafka data source used in the development environment does not contain the topic, the topic is not displayed in the drop-down list when you configure the Topic parameter.
  • If the Kafka data source used in the production environment does not contain the topic, the batch synchronization node fails after the node is committed and deployed. This is because the node cannot find the topic in the production environment.
Consumer Group IDThe consumer group ID. The ID can be used to collect statistics on and monitor data consumption in the Kafka data source. You can configure this parameter based on your business requirements. If you configure this parameter, make sure that the ID is unique in the Kafka data source.
Kafka VersionThe version range of the Kafka data source.
Note The version of the Kafka data source must range from 0.10.2 to 2.2.x.
Read From, Start Time, Read To, and End TimeYou can set Read From and Read To to Specific Offset, Start Time to ${startTime}, and End Time to ${endTime}. ${startTime} and ${endTime} are scheduling parameters.

These parameters specify the start offset and end offset of data synchronization. In this example, the batch synchronization node synchronizes data within the time range specified by the ${startTime} and ${endTime} scheduling parameters. When the batch synchronization node is run, the system replaces ${startTime} and ${endTime} with actual values based on the scheduling configurations of the node.

Time ZoneYou can leave this parameter empty or retain the default value. The default value of this parameter is the time zone of the server in the region where the DataWorks workspace resides.
Note If you have asked the technical personnel to change the time zone in which you want to schedule nodes, you can select the new time zone.
Key Type, Value Type, and EncodingYou can configure these parameters based on your business requirements.
Exit StrategyYou can set this parameter to Exit when poll nothing in 1 minute only if the following conditions are met. If the following conditions are not met, set this parameter to Exit when reach configured end offset or time.
  • No data is written to all or some partitions in the specified Kafka topic for 10 or more minutes.
  • After each auto triggered instance generated for the batch synchronization node is started, no messages whose timestamps are earlier than the value of the End Time parameter are written to the specified Kafka topic.
Advanced SettingsYou can retain the default value.

Configure the destination

In the Connections step, you can configure parameters related to the destination. In this example, a batch synchronization node is created to synchronize incremental data from Kafka to MaxCompute. The following figure shows the parameters that can be configured for the destination. Configure the destination
ParameterDescription
ConnectionThe name of the MaxCompute data source that you prepared.
TableThe name of the table to which you want to write data. If you use a workspace in standard mode, you must make sure that the MaxCompute data source used in the production environment of the workspace contains a table whose name and schema are the same as those of a table in the MaxCompute data source used in the development environment of the workspace.
Note Before you configure the Table parameter, take note of the following items:
  • If the MaxCompute data source used in the development environment does not contain the table, the table is not displayed in the drop-down list when you configure the Table parameter.
  • If the MaxCompute data source used in the production environment does not contain the table, the batch synchronization node fails after the node is committed and deployed. This is because the node cannot find the table in the production environment.
  • If the schema of the table in the production environment is different from the schema of the table in the development environment, field mappings that are established when the batch synchronization node is run may be different from the field mappings that are configured in the Mappings step. As a result, data may be incorrectly written to the destination.

If the MaxCompute table to which you want to write data is not created, you can click Generate Destination Table Schema to quickly generate a table creation statement. For more information, see Appendix: Generate the schema of a destination table.

Partition Key ColumnIn this example, the destination MaxCompute table contains one partition key column ds, and the ${partition} scheduling parameter is entered in the Partition Key Column field. This way, each time the batch synchronization node is run, data is written to the partition specified by the ${partition} scheduling parameter. The system replaces the ${partition} scheduling parameter with the actual value based on the scheduling configurations of the node.
Note Whether the Partition Key Column parameter needs to be configured and the number of times the system displays the Partition Key Column parameter vary based on the type of the destination MaxCompute table that you select. If you select a non-partitioned destination MaxCompute table, you do not need to manually configure this parameter. If you select a partitioned destination MaxCompute table, the system displays the Partition Key Column parameter based on the number of partition key columns in the table and the names of the partition key columns. The number of times that the system displays the Partition Key Column parameter is the same as the number of partition key columns in the table.
You can retain default values for the parameters that are not described in the table.

Configure field mappings

  1. In the Mappings step, you can edit fields in the Kafka data source.
    • The following table describes the default fields in Kafka.
      Field nameDescription
      __key__The key of a Kafka message.
      __value__The value of a Kafka message.
      __partition__The ID of the partition to which a Kafka message belongs. The ID is an integer that starts from 0.
      __headers__The headers of a Kafka message.
      __offset__The sequence number that is assigned to a Kafka message when the message is transferred to a partition. The sequence number is an integer that starts from 0.
      __timestamp__The timestamp of a Kafka message. The timestamp is a 13-digit integer, in milliseconds.
    • You can specify a method for parsing JSON-formatted content in Kafka messages based on your business requirements. You can use the .Sub-field or [Element in an array] syntax to obtain data in the JSON-formatted values of a message.
      Important If the name of a field in the JSON-formatted values of a Kafka message contains periods (.), the value of the field cannot be obtained or synchronized because of a field definition syntax error.
      The following code provides an example of data in the JSON-formatted values of a message.
      {
            "a": {
            "a1": "hello"
            },
            "b": "world",
            "c":[
                  "xxxxxxx",
                  "yyyyyyy"
                  ],
            "d":[
                  {
                        "AA":"this",
                        "BB":"is_data"
                  },
                  {
                        "AA":"that",
                        "BB":"is_also_data"
                  }
              ],
           "a.b": "unreachable"
      }
      • If you want to synchronize "hello" in a1, add the a.a1 field to the Kafka data source.
      • If you want to synchronize "world" in b, add the b field to the Kafka data source.
      • If you want to synchronize "yyyyyyy" in c, add the c[1] field to the Kafka data source.
      • If you want to synchronize "this" in AA, add the d[0].AA field to the Kafka data source.
      • If you add the a.b to the Kafka data source, "unreachable" cannot be synchronized.
  2. In the Mappings step, you can configure mappings between fields in the Kafka data source and fields in the MaxCompute data source.
    • DataWorks allows the existence of fields that have no mapped fields. If some fields in the source have no mapped fields in the destination, the batch synchronization node does not synchronize the fields from the source. If some fields in the destination have no mapped fields in the source, NULL is written to the fields in the destination as values.
    • Each field in the source can map to only one field in the destination, and each field in the destination can have only one mapped field in the source.

Configure scheduling settings

This section describes the scheduling settings that can be configured for a batch synchronization node. For information about common scheduling settings and all scheduling settings that can be configured for a batch synchronization node, see the topics in the Schedule directory.

  • Configure scheduling parameters.
    In the preceding configurations of the batch synchronization node, the following scheduling parameters are used: ${startTime}, ${endTime}, and ${partition}. You must specify value formats for the scheduling parameters based on your business requirements when you configure scheduling settings for the node. The following table provides configuration examples for some typical business scenarios.
    ScenarioRecommended configurationSample scenario description
    You want to schedule the batch synchronization node every 5 minutes.Scheduling parameters
    • startTime=$[yyyymmddhh24mi-8/24/60]00
    • endTime=$[yyyymmddhh24mi-3/24/60]00
    • partition=$[yyyymmddhh24mi-8/24/60]
    If the batch synchronization node is scheduled at 10:00 on November 22, 2022, the following situations occur:
    • Data in the time range from 09:52 on November 22, 2022 to 09:57 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
    • The data synchronized from the specified Kafka topic is written to the 202211220952 partition in the MaxCompute table.
    • The time specified by endTime is 3 minutes earlier than the scheduling time of the batch synchronization node. The scheduling time of the batch synchronization node is specified by $[yyyymmddhh24mi]. This configuration helps ensure that data in the specified time range is written to the specified Kafka topic before the node starts to read the data and prevents data missing.
    You want to schedule the batch synchronization node every hour.Scheduling parameters
    • startTime=$[yyyymmddhh24-1/24]0000
    • endTime=$[yyyymmddhh24]0000
    • partition=$[yyyymmddhh24]
    Note
    • If you want to schedule the batch synchronization node every 2 hours, set startTime to $[yyyymmddhh24-2/24]0000 and retain the settings of the endTime and partition parameters.
    • If you want to schedule the batch synchronization node every 3 hours, set startTime to $[yyyymmddhh24-3/24]00000 and retain the settings of the endTime and partition parameters.
    • You can refer to the preceding two configuration examples to configure scheduling parameters for a batch synchronization node that is scheduled by hour.
    If the batch synchronization node is scheduled at 10:05 on November 22, 2022, the following situations occur:
    • Data in the time range from 09:00 on November 22, 2022 to 10:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
    • The data synchronized from the specified Kafka topic is written to the 2022112210 partition in the MaxCompute table.
    You want to schedule the batch synchronization node every day.Scheduling parameters
    • startTime=$[yyyymmdd-1]000000
    • endTime=$[yyyymmdd]000000
    • partition=$[yyyymmdd-1]
    If the batch synchronization node is scheduled at 00:05 on November 22, 2022, the following situations occur:
    • Data in the time range from 00:00 on November 21, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
    • The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
    You want to schedule the batch synchronization node every week.Scheduling parameters
    • startTime=$[yyyymmdd-7]000000
    • endTime=$[yyyymmdd]000000
    • partition=$[yyyymmdd-1]
    If the batch synchronization node is scheduled at 00:05 on November 22, 2022, the following situations occur:
    • Data in the time range from 00:00 on November 15, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
    • The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
    You want to schedule the batch synchronization node every month.Scheduling parameters
    • startTime=$[add_months(yyyymmdd,-1)]000000
    • endTime=$[yyyymmdd]000000
    • partition=$[yyyymmdd-1]
    If the batch synchronization node is scheduled at 00:05 on November 22, 2022, the following situations occur:
    • Data in the time range from 00:00 on October 22, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
    • The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
  • Configure a scheduling cycle.
    • You can configure a scheduling cycle for the batch synchronization node based on the frequency at which you want to schedule the node.
      ScenarioRecommended configurationSample scenario description
      You want to schedule the batch synchronization node every 5 minutes.
      • Scheduling Cycle: Set this parameter to Minute.
      • Start From: Set this parameter to 00:00.
      • Interval: Set this parameter to 05. Unit: minutes.
      • End At: Set this parameter to 23:59.
      N/A.
      You want to schedule the batch synchronization node every hour.
      • Scheduling Cycle: Set this parameter to Hour.
      • Start From: Set this parameter to 00:15.
      • Interval: Set this parameter to 1. Unit: hours.
      • End At: Set this parameter to 23:59.
      You can specify a point in time that is later than 00:00, such as 00:15, for the Start From parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization node starts to read data.
      You want to schedule the batch synchronization node every day.
      • Scheduling Cycle: Set this parameter to Day.
      • Run At: Set this parameter to 00:15.
      You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization node starts to read data.
      You want to schedule the batch synchronization node every week.
      • Scheduling Cycle: Set this parameter to Week.
      • Run Every: Set this parameter to Monday.
      • Run At: Set this parameter to 00:15.
      You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization node starts to read data.
      You want to schedule the batch synchronization node every month.
      • Scheduling Cycle: Set this parameter to Month.
      • Run Every: Set this parameter to Day 1.
      • Run At: Set this parameter to 00:15.
      You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization node starts to read data.
      Important If messages whose timestamps are earlier than or equal to the start offset of your batch synchronization node are written to the specified Kafka topic after the node starts to synchronize data from Kafka, the data may be missed. In this case, you must pay attention to the data missing risks caused by latency or timestamp disorder of data transferred to the Kafka topic.
    • Configure the rerun attribute. We recommend that you select the Auto Rerun upon Error check box and set the Number of re-runs parameter to 3 and the Rerun interval parameter to 2. This way, if an error occurs on the batch synchronization node, the node can automatically rerun.
  • Configure a resource group for scheduling.

    You need to select a resource group for scheduling. We recommend that you use an exclusive resource group for scheduling.

  • Configure scheduling dependencies.

    The batch synchronization node configured in this example does not depend on other nodes. You need to only click Add Root Node in the Dependencies section of the Properties tab to configure scheduling dependencies for the node.

Configure a resource group for Data Integration

You can select the resource group for Data Integration that is connected to the Kafka and MaxCompute data sources.

Test the node

After the preceding configuration is complete, you can test the batch synchronization node on the DataStudio page to check whether the node can run as expected.
  1. Click the Run with Parameters icon in the top toolbar of the configuration tab of the batch synchronization node. In the dialog box that appears, assign values to the ${startTime}, ${endTime}, and ${partition} scheduling parameters and select a resource group for scheduling. Run with Parameters
  2. Check the running results of the batch synchronization node.
  3. Create an ad hoc query node and execute the following statements to check whether all data that is read from Kafka is correctly written to MaxCompute:
    select * from test_project.test_table where ds=2022112200 limit 10;
    select count(*) from test_project.test_table where ds=2022112200;

Commit and deploy the batch synchronization node

If the test on the batch synchronization node is successful, you can save the configurations of the node and commit and deploy the node to Operation Center. The batch synchronization node periodically synchronizes data from Kafka to MaxCompute. For information about how to deploy a node, see Deploy nodes.

Appendix: Generate the schema of a destination table

After you click Generate Destination Table Schema when you configure the destination, the statement that can be used to create a destination MaxCompute table is generated. The statement defines the name of the table and the fields in the table. Generate Destination Table Schema
  • The destination MaxCompute table name defined in the statement is the same as the name of the Kafka topic that you specified.
  • The destination MaxCompute table contains six fields, and the fields map to the following Kafka fields.
    Field nameDescription
    __key__The key of a Kafka message.
    __value__The value of a Kafka message.
    __partition__The ID of the partition to which a Kafka message belongs. The ID is an integer that starts from 0.
    __headers__The headers of a Kafka message.
    __offset__The sequence number that is assigned to a Kafka message when the message is transferred to a partition. The sequence number is an integer that starts from 0.
    __timestamp__The timestamp of a Kafka message. The timestamp is a 13-digit integer, in milliseconds.
  • By default, the lifecycle of a MaxCompute table is 100 years.
You can modify the items defined in the statement based on your business requirements. In addition, you can configure the batch synchronization node to parse the JSON-formatted values of Kafka messages and add the fields obtained from the parsing results to the default table creation statement based on your business requirements.