All Products
Search
Document Center

DataWorks:Synchronize data from a Kafka table to OSS (Hudi) in real time

Last Updated:Aug 29, 2023

This topic describes how to use DataWorks Data Integration to synchronize data to a data lake in real time. In the example, data is synchronized from Kafka to Object Storage Service (OSS) in real time.

Limits

Create an exclusive resource group for Data Integration and establish network connections between the resource group and data sources

Before you run a synchronization task to synchronize data, you must establish network connections between your exclusive resource group for Data Integration and data sources. For more information, see Establish a network connection between a resource group and a data source.

Note

When you add an OSS data source, you can specify the endpoint of OSS to establish a network connection between the exclusive resource group for Data Integration and the OSS data source. You do not need to make special configurations.

Add data sources

Add a Kafka data source

On the Data Source page in SettingCenter of the DataWorks console, click Add data source and configure the parameters that are displayed to add a Kafka data source to DataWorks. For more information, see Kafka data source.

Add an OSS data source

On the Data Source page in SettingCenter of the DataWorks console, click Add data source and configure the parameters that are displayed to add an OSS data source to DataWorks. You can add an OSS data source in RAM role-based authorization mode or AccessKey pair-based authorization mode:

  • AccessKey pair-based authorization mode

    • Endpoint: the endpoint of the OSS bucket to which you want to synchronize data. You can obtain the endpoint on the Overview page of the OSS bucket in the OSS console. For information, see Obtain information about the OSS bucket.

    • AccessKey ID: the AccessKey ID that is used to access the OSS data source. You can obtain the AccessKey ID on the Security Management page.

    • AccessKey Secret: the AccessKey secret that is used to access the OSS data source. The AccessKey secret is equivalent to a logon password.

  • RAM role-based authorization mode

    For information about how to add an OSS data source in RAM role-based authorization mode, see Use the RAM role-based authorization mode to add a data source.

After the OSS data source is prepared, select the exclusive resource group for Data Integration that you want to use and click Test connectivity to test the network connectivity between the resource group and data source. If the connectivity status is Connected, a network connection is established between the exclusive resource group for Data Integration and the OSS data source.

Create and configure a synchronization task

  1. Go to the Data Integration page of the DataWorks console. In the left-side navigation pane, click Data Synchronization Node. On the Nodes page, click Create Node to create a synchronization task.

  2. Configure basic information for the synchronization task.

    1. Task name: In this section, specify a name for the synchronization task based on your business requirements.

    2. Synchronization Type: In this section, select Kafka as the source type and OSS as the destination type. After you select the source and destination types, Single table real-time synchronization is automatically selected for the Synchronization Method parameter.

    3. Network and Resource Configuration: In this section, select the Kafka data source, OSS data source, and exclusive resource group for Data Integration that you prepared. Then, click Test Connectivity for All Resource Groups and Data Sources to test network connectivity between the exclusive resource group for Data Integration and the data sources.

  3. Configure the Kafka data source.

    Click KafkaSource in the wizard of the upper part of the configuration page and configure the Kafka data source.

    image.png
    1. Configure basic information for the Kafka data source.

      • Select the Kafka topic from which you want to synchronize data.

      • Retain default values for other parameters, or modify their configurations based on your business requirements.

    2. Click Data Sampling.

      In the dialog box that appears, configure the Start At and Sampled Data Records parameters and click Start Collection. The system samples data from the Kafka topic that you specified. You can preview the data in the Kafka topic. The data in the Kafka topic is used as input data for data preview and visualization configurations of a data processing node.

  4. Configure a data processing node.

    You can click the 添加 icon to add data processing methods. The following data processing methods are supported: Data Masking, Replace String, Data Filtering, JSON Parsing, and Edit Field and Assign Value. You can arrange the data processing methods based on your business requirements. When the synchronization task is run, data is processed based on the processing order that you specify. 数据处理After you configure a data processing node, you can click Preview Data Output in the upper-right corner of the configuration page. In the Preview Data Output dialog box, you can click Re-obtain Output of Ancestor Node to enable the data processing node to process the data that is sampled from the specified Kafka topic and preview the processing result.

    In the Preview Data Output dialog box, you can change the input data or click Manually Construct Data to customize the input data. Then, you can click Preview to preview the result generated after the input data is processed by the data processing node. If an exception occurs on the data processing node or dirty data is generated, the system reports an error in real time. This can help you check the configurations of the data processing node and determine whether expected results can be obtained at the earliest opportunity.

    Note

    Before you preview the result generated after the input data is processed by a data processing node, you must configure data sampling settings for the Kafka data source.

    输出预览
  5. Configure the OSS data source.

    Click OSS in the wizard in the upper part of the configuration page and configure the OSS data source.

    image.png
    1. Configure basic information for the OSS data source.

      • Destination Metadatabase Type: If you have activated Data Lake Formation (DLF) within your Alibaba Cloud account, the system automatically creates a metadatabase and a metatable in DLF when data is synchronized to a data lake.

        Note

        Cross-region metadatabase and metatable creation is not supported.

      • Destination Database: Select the name of the database to which you want to write data. You can also click Create Database to create a DLF metadatabase.

      • Destination Table: Select the generation method of the OSS table to which you want to write data. Valid values: Create Table and Use Existing Table.

      • Table Name: Enter or select the name of the OSS table to which you want to write data.

      • Storage Path: Select the OSS path in which you want to store the synchronized data.

    1. Edit the schema for the destination OSS table that is automatically created.

      If you select Create Table for the Destination Table parameter, click Edit Table Schema. In the dialog box that appears, edit the schema for the destination OSS table that is automatically created. You can also click Re-generate Table Schema Based on Output Column of Ancestor Node to re-generate a table schema based on the output columns of an ancestor node. You can select a column from the generated table schema and configure the column as the primary key.

    2. Configure mappings between fields in the source and fields in the destination.

      After you configure basic information for the OSS data source with the Destination Table parameter set to Use Existing Table or save the table schema settings, the system automatically establishes mappings between columns in the source and columns in the destination based on the same-name mapping principle. You can modify the mappings based on your business requirements. One column in the source can map to multiple columns in the destination. Multiple columns in the source cannot map to the same column in the destination. If a column in the source has no mapped column in the destination, data in the column in the source is not synchronized to the destination.

      image.png
    3. Configure processing policies for dynamic columns generated by an ancestor data processing node.

      Processing policies for dynamic columns generated by a data processing node are used to control the processing methods of dynamic columns that are generated by an ancestor data processing node. Only JSON parsing nodes can generate dynamic columns. If you make configurations in the Dynamic Output Field section when you configure a JSON parsing node, you must configure processing policies for dynamic columns that are generated by the JSON parsing node.

      Dynamic columns refer to columns that do not have fixed column names. The synchronization task automatically parses names and values for dynamic columns based on the input data of the source and synchronizes data from the columns to the destination. The following table describes the processing policies that are supported for dynamic columns.

      Processing policy

      Description

      Add Column

      If a destination OSS table does not have a column whose name is the same as a dynamic column, the system automatically adds a column to the destination OSS table and writes the data in the dynamic column to the added column.

      Ignore

      If a destination OSS table does not have a column whose name is the same as a dynamic column, the system ignores the dynamic column and writes the data in other columns that have mapped columns in the source table to the destination OSS table.

      Report Error

      If a destination OSS table does not have a column whose name is the same as a dynamic column, an error is reported, and the synchronization task is stopped.

  6. Configure advanced parameters.

    Click Configure Advanced Parameters in the upper-right corner of the configuration page. In the Configure Advanced Parameters dialog box, configure items such as parallelism and memory resources. You can configure each item based on the data amount of the specified Kafka topic and the number of partitions in the topic. We recommend that you configure the items based on the following instructions:

    • Number of parallel threads used to read data from Kafka = Number of partitions in the Kafka topic.

    • Number of parallel threads used to write data to OSS = Number of partitions in the Kafka topic.

    • Memory size = 1.5 GB + (256 MB × Number of partitions in the Kafka topic).

    • The interval at which checkpoints are triggered, in milliseconds. We recommend that you specify a minute-level interval.

    The performance and resource consumption of a synchronization task are affected by factors such as the data amount of the source and destination, the network environment, and the loads of DataWorks. You can refer to the preceding instructions to change the settings of the items based on your business requirements.

  7. Configure alert rules.

    To identify and respond to exceptions that occur on the synchronization task at the earliest opportunity, you can configure different alert rules for the synchronization task.

    1. Click Configure Alert Rule in the upper-right corner of the configuration page to configure an alert rule for a real-time synchronization subtask that will be generated by the synchronization task.

    2. Click Configure Alert Rule.

      For more information about how to configure an alert rule, see Best practices for configuring an alert rule for real-time synchronization nodes.

      Note

      If you set the Alert Reason parameter to DDL Notification, you can select only AddColumn for the Applicable DDL parameter. If the system detects that a dynamic column is generated by a data processing node, an error is triggered. The trigger condition is not the operation of adding a column to a destination OSS table.新增报警

    3. Manage alert rules.

      You can enable or disable an alert rule. You can also specify different alert recipients based on the severity level of alerts.报警规则

  8. Configure a resource group.

    Click Configure Resource Group in the upper-right corner of the configuration page and configure the exclusive resource group for Data Integration that you want to use to run the synchronization task.

  9. Perform a test on the synchronization task.

    After the preceding configuration is complete, click Perform Simulated Running in the upper-right corner of the configuration page to enable the synchronization task to synchronize the sampled data to the destination OSS table. You can view the synchronization result in the destination OSS table. If specific configurations of the synchronization task are invalid, an exception occurs during the test run, or dirty data is generated, the system reports an error in real time. This can help you check the configurations of the synchronization task and determine whether expected results can be obtained at the earliest opportunity.

    1. Click Perform Simulated Running in the upper-right corner of the configuration page. In the dialog box that appears, configure the parameters for data sampling from the specified Kafka topic, including the Start At and Sampled Data Records parameters.

    2. Click Start Collection to enable the synchronization task to sample data from the specified Kafka topic.

    3. Click Preview to enable the synchronization task to synchronize the sampled data to the destination OSS table.

  10. After the preceding configuration is complete, click Complete.

Perform O&M on the synchronization task

Start the synchronization task

After you complete the configuration of the synchronization task, you are navigated to the Nodes page. You can find the created synchronization task and click Start in the Actions column to start the synchronization task.

View the running status of the synchronization task

After you complete the configuration of the synchronization task, you can find the task on the Nodes page, and click the task name or click Running Details in the Actions column to view the running details of the task. The running details page displays the following information about the synchronization task:

  • Basic information: You can view the basic information about the synchronization task, such as the data sources and resource group.

  • Running status: The synchronization task has the following stages: schema migration and real-time synchronization. You can view the running status of the synchronization task in each stage.

  • Details: You can view the details of the synchronization task in the schema migration stage and real-time synchronization stage on the Schema Migration tab and the Real-time Synchronization tab.

    • Schema Migration: This tab displays information such as the generation methods of destination tables. The generation methods of destination tables include Use Existing Table and Create Table. If the generation method of a destination table is Create Table, the DDL statement that is used to create the table is displayed.

    • Real-time Synchronization: This tab displays statistics about real-time synchronization, including real-time synchronization details, DDL records, and alert information.

Rerun the synchronization task

  • Directly rerun the synchronization task

    Find the synchronization task on the Nodes page and choose More > Rerun in the Actions column to rerun the synchronization task without modifying the configurations of the synchronization task.

  • Modify the configurations of the synchronization task and then rerun the synchronization task

    Find the synchronization task on the Nodes page, modify the configurations of the synchronization task, and then click Complete. Click Apply Updates that is displayed in the Actions column of the synchronization task to rerun the synchronization task for the latest configurations to take effect.