All Products
Search
Document Center

Realtime Compute for Apache Flink:Ingest data into data warehouses in real time

Last Updated:Jan 15, 2024

Realtime Compute for Apache Flink allows you to ingest data into data warehouses in real time. Realtime Compute for Apache Flink provides the features, such as full and increment data synchronization switching, automatic discovery of metadata, synchronization of table schema changes, and database synchronization, to simplify real-time data ingestion into data warehouses and make real-time data synchronization more efficient and convenient. This topic describes how to develop a draft for synchronizing data from ApsaraDB RDS for MySQL to Hologres in the console of fully managed Flink.

Background information

For example, an ApsaraDB RDS for MySQL instance has four databases named tpc_ds, user_db1, user_db2, and user_db3. The tpc_ds database contains 24 business tables that have different table schemas. Sharding is performed on the databases user_db1, user_db2, and user_db3. Each of the three databases contains three tables that have the same table schema, and the three databases have a total of nine tables named user01 to user09. The following figure shows the databases and tables of the ApsaraDB RDS for MySQL instance that you can view in the Alibaba Cloud Database Management Service (DMS) console.数据库和表情况

If you want to develop a draft to synchronize the tables and data from the databases of the ApsaraDB RDS for MySQL instance to Hologres, you can perform the following steps. You can merge the tables named user01 to user09 and synchronize the data of these tables to a Hologres table.

In this topic, the CREATE TABLE AS statement and the CREATE DATABASE AS statement that are supported by fully managed Flink are used to synchronize data in an entire database, merge and synchronize tables in a sharded database, synchronize full and incremental data, and synchronize table schema changes in real time.

Prerequisites

Prepare test data

  1. Click tpc_ds.sql, user_db1.sql, user_db2.sql, and user_db3.sql to download test data to your on-premises computer.

  2. In the DMS console, prepare the test data of the ApsaraDB RDS for MySQL instance.

    1. Log on to an ApsaraDB RDS for MySQL instance from the DMS console.

    2. On the SQLConsole tab, enter the following commands and click Execute.

      The following commands are used to create the tpc_ds, user_db1, user_db2, and user_db3 databases.

      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. In the top navigation bar, click Data Import. On the page that appears, click the Large Data Import tab.

    4. On the Large Data Import tab, select the database whose data you want to import in the Database field, click Upload a file to upload the SQL file of the selected database, and then click Submit. After the file passes the precheck, submit a ticket. After the ticket is approved, click Execute Change. In the dialog box that appears, click Confirm Execution.

      Repeat this step to import data files to the tpc_ds, user_db1, user_db2, and user_db3 databases in sequence.导入数据

  3. Log on to the Hologres console and create the my_user database to store data that is obtained after the tables user01 to user09 are merged.

    For more information about how to create a database, see Create a database.

Configure IP address whitelists

To allow fully managed Flink to access ApsaraDB RDS for MySQL and Hologres instances, you must add the Classless Inter-Domain Routing (CIDR) block of the vSwitch to which the fully managed Flink workspace belongs to the whitelists of ApsaraDB RDS for MySQL and Hologres.

  1. Obtain the CIDR block of the vSwitch to which the fully managed Flink workspace belongs.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the desired workspace and choose More > Workspace Details in the Actions column.

    3. In the Workspace Details dialog box, view the CIDR block about the vSwitch to which the fully managed Flink workspace belongs.

      网段信息

  2. Add the CIDR block of the vSwitch to which the fully managed Flink instance belongs to the IP address whitelist of the ApsaraDB RDS for MySQL instance.

  3. Add the CIDR block of the vSwitch to which the fully managed Flink workspace belongs to the IP address whitelist of the Hologres instance.

    To configure an IP address whitelist for an instance in the HoloWeb console, you must set the Logon Method parameter to Password-free Logon when you set up a connection to the instance. For more information, see Configure an IP address whitelist.Holo白名单

Step 1: Create catalogs

If you want to synchronize an entire database, merge and synchronize tables in a sharded database, or synchronize a single table, you must create a destination catalog. You must also create a source catalog to obtain a list of source tables and information about the source tables. You can create the source and destination catalogs in the console of fully managed Flink. In this example, the source catalog is an ApsaraDB RDS for MySQL catalog and the destination catalog is a Hologres catalog.

  1. Create an ApsaraDB RDS for MySQL catalog named mysql.

    For more information, see Configure a MySQL catalog.mysql catalog

  2. Create a Hologres catalog named holo.

    For more information, see Create a Hologres catalog.Holo Catalog

  3. In the left-side navigation pane of the console of fully managed Flink, click Catalogs. On the Catalog List page, check whether the catalogs mysql and holo are created.

Step 2: Develop a data synchronization draft

  1. Log on to the console of fully managed Flink and create a draft.

    1. In the left-side navigation pane, click SQL Editor. In the upper-left corner of the SQL Editor page, click New.

    2. On the SQL Scripts tab of the New Draft dialog box, click Blank Stream Draft.

      Fully managed Flink provides various code templates and supports data synchronization. Each code template provides specific scenarios, code samples, and instructions for you. You can click a template to learn about the features and related syntax of Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.

    3. Click Next.

    4. In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.

      Parameter

      Description

      Example

      Name

      The name of the draft that you want to create.

      Note

      The draft name must be unique in the current project.

      flink-test

      Location

      The folder in which the code file of the draft is stored.

      You can click the 新建文件夹 icon to the right of a folder to create a subfolder.

      Development

      Engine Version

      You can view the engine version of Flink that is used by the deployment. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

      vvr-6.0.4-flink-1.15

    5. Click Create.

  2. Copy the following code of a draft to the code editor.

    The following sample code shows how to synchronize all tables in the tpc_ds database of ApsaraDB RDS for MySQL to the tpc_ds database of Hologres, and then merge and synchronize tables user01 to user09 to the my_user.users table of Hologres.

    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- Synchronize all tables in the tpc_ds database of ApsaraDB RDS for MySQL to the tpc_ds database of Hologres. 
    CREATE DATABASE IF NOT EXISTS tpc_ds
    AS DATABASE mysql.tpc_ds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    -- Synchronize tables user01 to user09 to the my_user.users table of Hologres. 
    CREATE TABLE IF NOT EXISTS my_user.users
    AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

    The CREATE DATABASE AS statement is used to synchronize all tables in the tpc_ds database to Hologres. The CREATE TABLE AS statement is used to synchronize tables user01 to user09 to a single table in Hologres. The STATEMENT SET statement is used to combine and commit the CREATE DATABASE AS and CREATE TABLE AS statements in one deployment. Fully managed Flink automatically optimizes the source and reuses one source node to read data from multiple ApsaraDB RDS for MySQL tables. This significantly reduces the number of ApsaraDB RDS for MySQL connections and data reading load, and improves reading stability.

    Note

    If you want to synchronize specific tables in a database, you can add INCLUDING TABLE or EXCLUDING TABLE to the CREATE DATABASE AS statement to specify the tables that you want to synchronize. For example, INCLUDING TABLE 'web.*' indicates that only tables whose names start with web in the database need to be synchronized.

Step 3: Start a deployment

  1. In the upper-right corner of the SQL Editor page, click Deploy. In the dialog box that appears, click Confirm.部署

    Note

    Session clusters are suitable for development and test environments in non-production environments. You can deploy or debug drafts in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. We recommend that you do not deploy drafts in session clusters. If you deploy drafts in session clusters, stability issues may occur. For more information, see Configure a development and test environment (session cluster).

  2. On the Deployments page, find the desired deployment and click Start in the Actions column. For more information about how to configure parameters, see Start a deployment.

  3. In the Start Job dialog box, click Start.

    You can view the status and information of the deployment on the Deployments page after the deployment is started.作业状态

Step 4: View the full data synchronization result

  1. Log on to the Hologres console.

  2. In the left-side navigation pane, click Go to HoloWeb. On the Metadata Management page of HoloWeb, view the 24 tables and table data in the tpc_ds database of the Hologres instance.

    holo表数据

  3. On the Metadata Management page, view the schema of the users table in the my_user database.

    The following figure shows the table schema and data after full data synchronization.

    • Table schema表结构

      In the schema of the users table, the _db_name and _table_name columns are added based on the schema of the ApsaraDB RDS for MySQL source tables. The _db_name column indicates the database name of the data source, and the _table_name column indicates the table name of the data source. The two columns are used as part of the joint primary key to ensure that data is unique after tables in the sharded database are merged.

    • Table data

      In the upper-right corner of the users tab, click Query table. In the SQL editor, enter the following command and click Run:

      select * from users order by _db_name,_table_name,id;

      The following figure shows the table data.表数据

Step 5: View the incremental synchronization result

After full data synchronization is complete, the system automatically switches the data synchronization deployment to the incremental data synchronization phase. No manual intervention is required. You can determine the data synchronization phase of a data synchronization deployment based on the value of currentEmitEventTimeLag at a specific point in time on the Metrics tab in the console of fully managed Flink.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  3. In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the desired deployment.

  4. Click the Metrics tab.

  5. View the chart of currentEmitEventTimeLag to determine the data synchronization phase of the deployment.

    数据曲线

    • If the value of currentEmitEventTimeLag at a point in time is 0, the deployment runs in the full data synchronization phase.

    • If the value of currentEmitEventTimeLag at a point in time is greater than 0, the deployment enters the incremental synchronization phase.

  6. Verify the real-time synchronization of data changes and schema changes.

    The MySQL CDC data source allows you to synchronize table data changes and schema changes in real time during incremental data synchronization. After a deployment enters the incremental data synchronization phase, you can modify the table schema and data in tables user01 to user09 of the ApsaraDB RDS for MySQL instance to verify the real-time synchronization of data changes and schema changes.

    1. Log on to an ApsaraDB RDS for MySQL instance from the DMS console.

    2. In the user_db2 database, run the following commands to modify the schema of the user02 table, insert data into the user02 table, and update data in the user05 table:

      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- Add the age column to the user02 table. 
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- Insert data that includes the age information into the user02 table. 
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- Change a specific value of the name field to uppercase letters.
    3. In the Hologres console, view the changes in the schema and data of the users table.

      In the upper-right corner of the users tab, click Query table. In the SQL editor, enter the following command and click Run:

      select * from users order by _db_name,_table_name,id;

      The following figure shows the table data. 表结构和数据变化The schemas of multiple tables in the sharded database are different. However, the changes in the schema and data of the user02 table and changes in the data of the user05 table are synchronized to the destination table in real time. In the users table of Hologres, the age column is added, the age data of Tony is inserted, and the name JARK is displayed in uppercase letters.

(Optional) Step 6: Configure resources for the deployment

To optimize deployment performance, we recommend that you adjust the parallelism of deployments and resource configurations of different nodes based on the amount of data that needs to be processed. You can use the basic resource configuration mode to configure the parallelism of deployments and the number of CUs in a simplified manner. You can also use the expert resource configuration mode to adjust the parallelism of deployments and resource configurations of nodes in a fine-grained manner.

  1. In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the desired deployment.

  2. In the upper-right corner of the Resources section on the Configuration tab, click Edit.

  3. Select Export for the Mode parameter. Then, click Get Plan Now.

  4. Move the pointer over More and click Expand All.

    You can view the complete topology to learn the data synchronization plan of the deployment. The plan shows the tables from which data needs to be synchronized.

  5. Manually configure PARALLELISM for each node.

    Set PARALLELISM to 4 for all sink nodes except the holo.tpc_ds.store_sales node. The store_sales table in the tpc_ds database contains the largest amount of data. To improve the performance of data writing to Hologres, you can set PARALLELISM to 8 for the holo.tpc_ds.store_sales node. For more information about how to configure resource parameters, see Configure a deployment.

  6. In the upper-right corner of the Resources section, click Save.

  7. On the Deployments page, find the desired deployment and click Start in the Actions column.

    Important

    If you want to modify a deployment that is in the RUNNING state, perform the following steps: Find the deployment and click Cancel in the Actions column. When the deployment is in the CANCELLED state, click Start in the Actions column. After the deployment is started, the modification on the resource configuration of the deployment takes effect.

  8. Click the name of the desired deployment. On the Overview tab, view the effect after the adjustment.

FAQ

References