All Products
Search
Document Center

DataWorks:Create a real-time ETL synchronization task to synchronize data from Kafka to Hologres

Last Updated:Feb 01, 2024

DataWorks allows you to use a real-time extract, transform, and load (ETL) synchronization task to synchronize data from Kafka to Hologres. The task initializes the schema of a destination Hologres table based on the structure of a specified Kafka topic. Then, the task synchronizes full data from the specified Kafka topic to the destination Hologres table at a time and synchronizes incremental data from the topic to the destination Hologres table in real time. This topic describes how to create a real-time ETL synchronization task to synchronize data from Kafka to Hologres.

Limits

Prepare data sources

Prepare a Kafka data source

Add a Kafka data source to DataWorks. For more information, see Kafka data source.

Prepare a Hologres data source

  • Obtain information about the Hologres instance that you want to add to DataWorks as a data source

    Log on to the Hologres console. In the left-side navigation pane, click Instances. On the Instances page, find the Hologres instance that you want to add to DataWorks as a data source and click the instance name. On the Instance Details page, obtain the following information about the Hologres instance: instance ID, region, and endpoint. If the virtual private cloud (VPC) network type is enabled for the Hologres instance, you can also obtain the VPC ID and vSwitch ID.

  • Prepare a Hologres data source

    Associate a Hologres compute engine with the desired workspace to enable the system to generate a Hologres data source. Alternatively, directly add a Hologres data source to the desired workspace. For more information, see Associate a Hologres compute engine with a workspace or Add a Hologres data source.

Create an exclusive resource group for Data Integration and establish network connections between the resource group and data sources

Before data synchronization, you must establish network connections between the exclusive resource group for Data Integration and the data sources. For more information, see Configure network connectivity.

Note

Kafka and Hologres support the following network types:

  • Kafka: Internet or a specified VPC

  • Hologres: Internet, a specified VPC, or AnyTunnel

  • If the exclusive resource group for Data Integration and the Hologres or Kafka data source reside in the same region, you can use a VPC in the region based on AnyTunnel or SingleTunnel to establish a network connection between the resource group and data source. If the network type of the Hologres or Kafka data source is SingleTunnel, you must perform the following steps to establish a network connection:

    1. Associate the exclusive resource group for Data Integration with a VPC and add a custom route for the resource group.

    2. Add the required IP address or CIDR block to the IP address whitelist of the Hologres or Kafka data source.

  • If the exclusive resource group for Data Integration and the Hologres or Kafka data source reside in different regions, you can establish a network connection between the resource group and data source over the Internet. To establish such a network connection, you must add the required IP address or CIDR block to the IP address whitelist of the data source.

Step 1: Associate the exclusive resource group for Data Integration with a VPC and add a custom route for the resource group

Note

If you want to establish a network connection between the exclusive resource group for Data Integration and a data source over the Internet, you can skip this step.

  1. Associate the exclusive resource group for Data Integration with a VPC.

    1. Go to the Resource Groups page in the DataWorks console, find the exclusive resource group for Data Integration that you want to use, and then click Network Settings in the Actions column.

    2. On the VPC Binding tab of the page that appears, click Add Binding. In the Add VPC Binding panel, configure the following parameters to associate the resource group with a VPC:

      • VPC: Select the VPC in which the data source resides from the VPC drop-down list.

      • Zone and VSwitch: Preferentially select the zone and vSwitch in which the data source resides. If the zone in which the data source resides is not displayed in the Zone drop-down list, select a random zone and a random vSwitch. However, you must make sure that the VPC to which the selected vSwitch belongs can connect to the VPC in which the data source resides.

      • Security Groups: Select a security group based on your business requirements. The selected security group must meet the following requirements:

        • An inbound rule in the security group allows access from the HTTP ports of the Kafka data source that range from 9092 to 9094. You can log on to the ECS console to view the security group.

        • The CIDR block of the vSwitch with which the exclusive resource group for Data Integration is associated is specified as an authorization object in the inbound rule.

  2. Add a custom route for the exclusive resource group for Data Integration.

    Note

    If you select the zone and vSwitch in which the data source resides in the preceding substep, you can skip this substep. If you select another zone and another vSwitch, you must perform operations in this substep to add a custom route for the exclusive resource group for Data Integration.

    1. Go to the Resource Groups page in the DataWorks console, find the exclusive resource group for Data Integration that you want to use, and then click Network Settings in the Actions column.

    2. On the VPC Binding tab of the page that appears, find the VPC association record and click Custom Route in the Actions column.

    3. In the Custom Route panel, click Add Route. In the Add Route dialog box, configure the following parameters to add a custom route for the exclusive resource group for Data Integration:

      • Destination VPC: Select the region and VPC in which the data source resides.

      • Destination VSwitch: Select the vSwitch in which the data source resides.

Step 2: Configure the IP address whitelist of a data source

  1. Obtain the required IP address or CIDR block. In this subsection, an ApsaraMQ for Kafka data source is used.

    • If you establish a network connection between the exclusive resource group for Data Integration and the data source over a VPC, you must add the CIDR block of the vSwitch that you specify when you associate the resource group with the VPC to the IP address whitelist of the data source. You can find the resource group on the Exclusive Resource Groups tab of the Resource Groups page in the DataWorks console and click Network Settings in the Actions column to view the CIDR block of the vSwitch.独享绑定的交换机网段

    • If you establish a network connection between the exclusive resource group for Data Integration and the data source over the Internet, you must add the elastic IP address (EIP) of the resource group to the IP address whitelist of the data source. You can find the resource group on the Exclusive Resource Groups tab of the Resource Groups page in the DataWorks console and click View Information in the Actions column to view the EIP of the resource group.查看独享资源组EIP

  2. Add the required IP address or CIDR block to the IP address whitelists of the data sources.

    1. Add the IP address or CIDR block to the IP address whitelist of the ApsaraMQ for Kafka data source.

      Log on to the ApsaraMQ for Kafka console. Find the ApsaraMQ for Kafka data source that you want to use and go to the Whitelist Management page of the data source. On this page, click Create Whitelist to add the IP address or CIDR block to the IP address whitelist.

    2. Add the IP address or CIDR block to the IP address whitelist of the Hologres data source.

      Log on to the HoloWeb console. Find the Hologres data source to which you want to write data and go to the Security Center tab. In the left-side navigation pane, click IP Address Whitelist. On the IP Address Whitelist page, click Add IP Address to Whitelist. In the Add IP Address to Whitelist dialog box, enter the IP address or CIDR block that you want to add in the IP Address field and click OK.

Create and configure a synchronization task

  1. Go to the Data Integration page of the DataWorks console. The Data Integration page appears. On the Data Integration page, select a source type and a destination type and click Create.

  2. In the Basic Settings section and the Network and Resource Configuration section of the Create Data Synchronization Solution page, configure basic information and a resource group for the synchronization task.

    1. New Node Name: Specify a name for the synchronization task based on your business requirements.

    2. Synchronization Method: Select Real-time synchronization of single table.

    3. Network and Resource Configuration: In this section, select the Kafka data source, Hologres data source, and exclusive resource group for Data Integration that you prepared. Then, click Test Connectivity for All Resource Groups and Data Sources to test the network connectivity between the resource group and the data sources.

  3. Configure the Kafka data source.

    Click KafkaSource in the wizard of the upper part of the configuration page and configure the Kafka data source.

    1. Configure basic information for the Kafka data source.

      • Select the Kafka topic from which you want to synchronize data.

      • Retain default values for other parameters, or modify their configurations based on your business requirements.

    2. Click Data Sampling.

      In the dialog box that appears, configure the Start At and Sampled Data Records parameters and click Start Collection. The system samples data from the Kafka topic that you specified. You can preview the data in the Kafka topic. The data in the Kafka topic is used as input data for data preview and visualization configurations of a data processing node.

  4. Configure a data processing node.

    Click the 添加 icon to add a data processing method. The following data processing methods are supported: Data Masking Rule, String Replacement Rule, Filter Conditions, JSON Parsing, and Edit Field and Assign Value. You can arrange the data processing methods based on your business requirements. When the synchronization task is run, data is processed based on the processing order that you specify. 数据处理After you configure a data processing node, you can click Preview Data Output in the upper-right corner of the configuration page. In the Preview Data Output dialog box, you can click Re-obtain Output of Ancestor Node to enable the data processing node to process the data that is sampled from the specified Kafka topic and preview the processing result.

    In the Preview Data Output dialog box, you can change the input data or click Manually Construct Data to customize the input data. Then, you can click Preview to preview the result generated after the input data is processed by the data processing node. If an exception occurs on the data processing node or dirty data is generated, the system reports an error in real time. This can help you check the configurations of the data processing node and determine whether expected results can be obtained at the earliest opportunity.

    Note

    Before you preview the result generated after the input data is processed by a data processing node, you must configure data sampling settings for the Kafka data source.

    输出预览

  5. Configure the Hologres data source.

    Click Hologres in the wizard of the upper part of the configuration page and configure the Hologres data source.

    1. In the Basic Information section, configure the parameters.

      • Select the Hologres schema to which you want to write data.

      • Configure the Destination Table parameter. You can select Create Table or Use Existing Table.

      • Enter a table name or select a table name from the Table Name drop-down list.

    2. Edit the schema for the destination Hologres table that will be automatically created if you set Destination Table to Create Table.

      If you select Create Table for the Destination Table parameter, click Edit Table Schema. In the dialog box that appears, edit the schema for the destination Hologres table that will be automatically created. You can also click Re-generate Table Schema Based on Output Column of Ancestor Node to re-generate a table schema based on the output columns of an ancestor node. You can select a column from the generated table schema and configure the column as the primary key.

      Note

      The destination Hologres table must have a primary key. Otherwise, the configurations cannot be saved.

    3. Configure mappings between fields in the source and fields in the destination.

      After you configure basic information for the Hologres data source with the Destination Table parameter set to Use Existing Table or save the table schema settings, the system automatically establishes mappings between columns in the source and columns in the destination based on the same-name mapping principle. You can modify the mappings based on your business requirements. One column in the source can map to multiple columns in the destination. Multiple columns in the source cannot map to the same column in the destination. If a column in the source has no mapped column in the destination, data in the column in the source is not synchronized to the destination.

    4. Configure processing policies for dynamic columns generated by an ancestor data processing node.

      Processing policies for dynamic columns generated by a data processing node are used to control the processing methods of dynamic columns that are generated by an ancestor data processing node. Only JSON parsing nodes can generate dynamic columns. If you make configurations in the Dynamic Output Field section when you configure a JSON parsing node, you must configure processing policies for dynamic columns that are generated by the JSON parsing node.

      Dynamic columns refer to columns that do not have fixed column names. The synchronization task automatically parses names and values for dynamic columns based on the input data of the source and synchronizes data from the columns to the destination. The following table describes the processing policies that are supported for dynamic columns.

      Processing policy

      Description

      Add Column

      If a destination Hologres table does not have a column whose name is the same as a dynamic column, the system automatically adds a column to the destination Hologres table and writes the data in the dynamic column to the added column.

      Ignoring

      If a destination Hologres table does not have a column whose name is the same as a dynamic column, the system ignores the dynamic column and writes the data in other columns that have mapped columns in the source table to the destination Hologres table.

      Report Error

      If a destination Hologres table does not have a column whose name is the same as a dynamic column, an error is reported, and the synchronization task is stopped.

  6. Configure advanced parameters.

    Click Configure Advanced Parameters in the upper-right corner of the configuration page. In the Configure Advanced Parameters dialog box, configure items such as parallelism and memory resources. You can configure each item based on the data amount of the specified Kafka topic and the number of partitions in the topic. We recommend that you configure the items based on the following instructions:

    • Number of parallel threads used to read data from Kafka = Number of partitions in the Kafka topic

    • Number of parallel threads used to write data to Hologres = Number of partitions in the Kafka topic

    • Memory size = 1.5 GB + (256 MB × Number of partitions in the Kafka topic)

    • Condition for enabling the distributed execution mode: more than 12 partitions contained in the Kafka topic

    • Number of subtasks = Rounding up the quotient of the number of partitions in the Kafka topic divided by 6

    • Number of parallel threads used to read data from Kafka for a single subtask = 6

    • Number of parallel threads used to write data to Hologres for a single subtask = 6

    • Size of memory that can be occupied by a single subtask = 2 GB

    The performance and resource consumption of a synchronization task are affected by factors such as the data amount of the source and destination, the network environment, and the loads of DataWorks. You can refer to the preceding instructions to change the settings of the items based on your business requirements.

  7. Configure alert rules.

    To identify and respond to exceptions that occur on the synchronization task at the earliest opportunity, you can configure different alert rules for the synchronization task.

    1. Click Configure Alert Rule in the upper-right corner of the configuration page to configure an alert rule for a real-time synchronization subtask that will be generated by the synchronization task.

    2. Click Configure Alert Rule.

      For more information about how to configure an alert rule, see Best practices for configuring an alert rule for real-time synchronization nodes.

      Note

      If you set the Alert Reason parameter to DDL Notification, you can select only Add Column for the Applicable DDL parameter. If the system detects that a dynamic column is generated by a data processing node, an error is triggered. The trigger condition is not the operation of adding a column to a destination Hologres table.新增报警

    3. Manage alert rules.

      You can enable or disable an alert rule. You can also specify different alert recipients based on the severity level of alerts.报警规则

  8. Configure a resource group.

    Click Configure Resource Group in the upper-right corner of the configuration page and configure the exclusive resource group for Data Integration that you want to use to run the synchronization task.

  9. Perform a test on the synchronization task.

    After the preceding configuration is complete, click Perform Simulated Running in the upper-right corner of the configuration page to enable the synchronization task to synchronize the sampled data to the destination Hologres table. You can view the synchronization result in the destination Hologres table. If specific configurations of the synchronization task are invalid, an exception occurs during the test run, or dirty data is generated, the system reports an error in real time. This can help you check the configurations of the synchronization task and determine whether expected results can be obtained at the earliest opportunity.

    1. Click Perform Simulated Running in the upper-right corner of the configuration page. In the dialog box that appears, configure the parameters for data sampling from the specified Kafka topic, including the Start At and Sampled Data Records parameters.

    2. Click Start Collection to enable the synchronization task to sample data from the specified Kafka topic.

    3. Click Preview to enable the synchronization task to synchronize the sampled data to the destination Hologres table.

After the preceding configuration is complete, click Complete Configuration.

Perform O&M on the synchronization task

Start the synchronization task

After you complete the configuration of the synchronization task, you are navigated to the Data Integration page. You can find the created synchronization task and click Start in the Actions column to start the synchronization task.

View the running status of the synchronization task

After you complete the configuration of the synchronization task, you can find the task on the Data Integration page, and click the task name or click Running Details in the Actions column to view the running details of the task. The running details page displays the following information about the synchronization task.

  • Basic information: You can view the basic information about the synchronization task, such as the data sources and resource group.

  • Running status: The synchronization task has the following stages: schema migration and real-time synchronization. You can view the running status of the synchronization task in each stage.

  • Details: You can view the details of the synchronization task in the schema migration stage and real-time synchronization stage on the Schema Migration tab and the Real-time Synchronization tab.

    • Schema Migration: This tab displays information such as the generation methods of destination tables. The generation methods of destination tables include Use Existing Table and Create Table. If the generation method of a destination table is Create Table, the DDL statement that is used to create the table is displayed.

    • Real-time Synchronization: This tab displays statistics about real-time synchronization, including real-time read and write traffic, dirty data information, failovers, and run logs.

Rerun the synchronization task

  • Directly rerun the synchronization task

    Find the synchronization task in the Nodes section of the Data Integration page and choose More > Rerun in the Actions column to rerun the synchronization task without modifying the configurations of the synchronization task.

  • Modify the configurations of the synchronization task and then rerun the synchronization task

    Find the synchronization task in the Nodes section of the Data Integration page, modify the configurations of the synchronization task, and then click Complete. Click Apply Updates that is displayed in the Actions column of the synchronization task to rerun the synchronization task for the latest configurations to take effect.