Fully managed Flink allows you to ingest data into data warehouses or data lakes in real time. This topic describes how to build a job that synchronizes data from MySQL to Hologres in the console of fully managed Flink.

Background information

For example, a MySQL instance has four databases named tpc_ds, user_db1, user_db2, and user_db3. The tpc_ds database contains 24 business tables that have different table schemas. Sharding is performed on the databases user_db1, user_db2, and user_db3. Each of the three databases contains three tables that have the same table schema, and the three databases have a total of nine tables named user01 to user09. The following figure shows the databases and tables of the MySQL instance that you can view in the Alibaba Cloud Database Management Service (DMS) console. Databases and tables
If you want to develop a job to synchronize the tables and data from the databases of the MySQL instance to Hologres, you can perform the following steps. You can merge the tables named user01 to user09 and synchronize the data of these tables to a Hologres table.

In this topic, CREATE TABLE AS statement and CREATE DATABASE AS statement that are provided by fully managed Flink are used to synchronize data in an entire database, merge and synchronize tables in a sharded database, synchronize full and incremental data, and synchronize table schema changes in real time.

Prerequisites

Prepare test data

  1. Click tpc_ds.sql, user_db1.sql, user_db2.sql, and user_db3.sql to download test data to your on-premises computer.
  2. In the DMS console, prepare the test data of the ApsaraDB RDS for MySQL instance.
    1. Log on to the ApsaraDB RDS for MySQL instance from the DMS console.
    2. On the SQLConsole tab, enter the following commands and click Execute.
      The following commands are used to create the tpc_ds, user_db1, user_db2, and user_db3 databases.
      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. In the top navigation bar of the DMS console, click Data Import.
    4. Click Large Data Import.
    5. Select the database whose data you want to import in the Database field, click Upload a file to upload the SQL file of the selected database, and then click Submit. After the file passes the precheck, submit a ticket. After the ticket is approved, click Execute Change.
      Repeat this step to import data files to the tpc_ds, user_db1, user_db2, and user_db3 databases in sequence. Data Import
  3. Log on to the Hologres console and create the my_user database to store data that is obtained after the tables user01 to user09 are merged.
    For more information about how to create a database, see Create a database.

Configure IP address whitelists

To allow fully managed Flink to access MySQL and Hologres instances, you must add the Classless Inter-Domain Routing (CIDR) block of the vSwitch to which the fully managed Flink instance belongs to the whitelists of MySQL and Hologres.

  1. Obtain the CIDR block of the vSwitch to which the fully managed Flink instance belongs.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage, click More in the Actions column, and then select Workspace Details.
    3. In the Workspace Details dialog box, view the CIDR block of the vSwitch to which the fully managed Flink instance belongs.
      CIDR Block
  2. Add the CIDR block of the vSwitch to which the fully managed Flink instance belongs to the IP address whitelist of the ApsaraDB RDS for MySQL instance.
  3. Add the CIDR block of the vSwitch to which the fully managed Flink instance belongs to the IP address whitelist of the Hologres instance.
    For more information, see Configure IP address whitelists. IP address whitelist of the Hologres instance

Step 1: Create catalogs

If you want to synchronize an entire database, merge and synchronize tables in a sharded database, or synchronize a single table, you must create a destination catalog. You must also create a source catalog to obtain a list of source tables and information about the source tables. You can create the source and destination catalogs in the console of fully managed Flink. In this example, the source catalog is a MySQL catalog and the destination catalog is a Hologres catalog.

  1. Create a MySQL catalog named mysql.
    For more information, see Configure a MySQL catalog. mysql catalog
  2. Create a Hologres catalog named holo.
    For more information, see Create a Hologres catalog. Holo Catalog
  3. On the Schemas tab, confirm that the mysql and holo catalogs are created.
    Refresh

Step 2: Develop a data synchronization job

  1. Log on to the console of fully managed Flink and create a job.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Draft Editor.
    4. Click New.
    5. In the New Draft dialog box, configure the parameters of the job. The following table describes the parameters.
      Parameter Example Description
      Name flink-test The name of the job.
      Note The job name must be unique in the current project.
      Type STREAM / SQL A data synchronization job can only be of the STREAM / SQL type.
      Deployment Target vvp-workload The name of the fully managed Flink cluster in which you want to deploy the job.
      Locate Development The folder in which the code file of the job is stored. By default, the code file of the job is stored in the Development folder.

      You can click the New Folder icon to the right of a folder to create a subfolder.

    6. Click OK.
  2. Copy the following job code to the code editor.
    The following sample code shows how to synchronize all tables in the tpc_ds database of MySQL to the tpc_ds database of Hologres, and then merge and synchronize tables user01 to user09 to the my_user.users table of Hologres.
    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- Synchronize all tables in the tpc_ds database of MySQL to the tpc_ds database of Hologres. 
    CREATE DATABASE IF NOT EXISTS tpc_ds
    AS DATABASE mysql.tpc_ds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    -- Synchronize tables user01 to user09 to the my_user.users table of Hologres. 
    CREATE TABLE IF NOT EXISTS my_user.users
    AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
    The CREATE DATABASE AS statement is used to synchronize all tables in the tpc_ds database to Hologres. The CREATE TABLE AS statement is used to synchronize tables user01 to user09 to a single table in Hologres. The STATEMENT SET statement is used to combine and commit the CREATE DATABASE AS and CREATE TABLE AS statements in one job. Fully managed Flink automatically optimizes the source and reuses one source node to read data from multiple MySQL tables. This significantly reduces the number of MySQL connections and data reading load, and improves reading stability.
    Note If you want to synchronize specific tables in a database, you can add INCLUDING TABLE or EXCLUDING TABLE to the CREATE DATABASE AS statement to specify the tables that you want to synchronize. For example, INCLUDING TABLE 'web.*' indicates that only tables whose names start with web in the database need to be synchronized.

Step 3: Start the job

  1. In the upper-right corner of the Draft Editor page, click Publish.
  2. On the Deployments page in the console of fully managed Flink, find the job that you want to start and click Start in the Actions column.
    In the Deployment Starting Configuration dialog box, click Confirm Running. Then, you can view the transition process from a current state to a desired state and the final result. When the state changes to RUNNING, the job is running normally. You can view the status and information of the job on the Deployments page. Status transition

Step 4: View the full data synchronization result

  1. Log on to the Hologres console.
  2. In the left-side navigation pane, click Go to HoloWeb. On the Metadata Management page of HoloWeb, view the 24 tables and table data in the tpc_ds database of the Hologres instance.
    Table data of holo
  3. On the Metadata Management page, view the schema of the users table in the my_user database.
    The following figure shows the table schema and data after full data synchronization.
    • Table schemaTable schema

      In the schema of the users table, the _db_name and _table_name columns are added based on the schema of the MySQL source tables. The _db_name column indicates the database name of the data source, and the _table_name column indicates the table name of the data source. The two columns are used as part of the joint primary key to ensure that data is unique after tables in the sharded database are merged.

    • Table data
      In the upper-right corner of the users tab, click Query table. In the SQL editor, enter the following command and click Run:
      select * from users order by _db_name,_table_name,id;
      The following figure shows the table data. Table data

Step 5: View the incremental synchronization result

After full data synchronization is complete, the system automatically switches the data synchronization job to the incremental data synchronization phase. No manual intervention is required. You can determine the data synchronization phase of a data synchronization job based on the value of currentEmitEventTimeLag at a specific point in time on the Metrics tab in the console of fully managed Flink.

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, choose Applications > Deployments.
  4. Click the name of the job whose data synchronization phase you want to view.
  5. Click the Metrics tab.
  6. View the chart of currentEmitEventTimeLag to determine the data synchronization phase of the job.
    Metrics
    • If the value of currentEmitEventTimeLag at a point in time is 0, the job runs in the full data synchronization phase.
    • If the value of currentEmitEventTimeLag at a point in time is greater than 0, the job enters the incremental synchronization phase.
  7. Verify the real-time synchronization of data changes and schema changes.
    The MySQL CDC data source allows you to synchronize table data changes and schema changes in real time during incremental data synchronization. After a job enters the incremental data synchronization phase, you can modify the table schema and data in tables user01 to user09 of the MySQL instance to verify the real-time synchronization of data changes and schema changes.
    1. Log on to an ApsaraDB RDS for MySQL instance from the DMS console.
    2. In the user_db2 database, run the following commands to modify the schema of the user02 table, insert data into the user02 table, and update data in the user05 table:
      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- Add the age column to the user02 table. 
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- Insert data that includes the age information into the user02 table. 
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- Change a specific value of the name field to uppercase letters. 
    3. In the Hologres console, view the changes in the schema and data of the users table.
      In the upper-right corner of the users tab, click Query table. In the SQL editor, enter the following command and click Run:
      select * from users order by _db_name,_table_name,id;
      The following figure shows the table data. Table schema and data changesThe schemas of multiple tables in the sharded database are different. However, the changes in the schema and data of the user02 table and changes in the data of the user05 table are synchronized to the destination table in real time. In the users table of Hologres, the age column is added, the age data of Tony is inserted, and the name JARK is displayed in uppercase letters.

(Optional) Step 6: Configure resources for the job

To optimize job performance, we recommend that you adjust the parallelism of jobs and resource configurations of different nodes based on the amount of data that needs to be processed. You can use the basic resource configuration mode to configure the parallelism of jobs and the number of CUs in a simplified manner. You can also use the expert resource configuration mode to adjust the parallelism of jobs and resource configurations of nodes in a fine-grained manner.

  1. On the Deployments page, click the name of the job whose resources you want to configure. In the upper-right corner of the job details page, click Configure.
  2. On the right side of the Draft Editor page, click the Resources tab.
  3. In the Resource Configuration panel, set Configuration Mode to Expert.
  4. Click Get Plan Now in the Resource Plan section.
  5. Click Expand All.
    View the complete topology to learn the data synchronization plan of the job. The plan shows the tables that need to be synchronized.
  6. Manually configure PARALLELISM for each node.
    Set PARALLELISM to 4 for all sink nodes except the holo.tpc_ds.store_sales node. The store_sales table in the tpc_ds database contains the largest amount of data. To improve the performance of data writing to Hologres, you can set PARALLELISM to 8 for the holo.tpc_ds.store_sales node. For more information about resource configurations, see Configure resources in expert mode. The following figure shows the resource configuration plan of the job after adjustment. Resource configuration plan
  7. Click Save Plan.
  8. Click Publish.
  9. On the Deployments page, view the effect after the adjustment.
    Effect after the adjustment

FAQ

  • Q: Why does the schema of the destination table remain unchanged after I modify the table schema of the MySQL instance?

    A: The synchronization of table schema changes is not triggered based on specific DDL statements but is triggered based on the schema changes between the two data records before and after the schema is changed. If only DDL statements change, but no data is added or modified in the source table, data change is not triggered in the destination table. For more information, see Synchronization policies of table schema changes.

  • Q: Why does the finish split response timeout error message appear in the source?

    A: This error message appears because the source fails to respond to RPC requests of the coordinator due to high CPU utilization of tasks. In this case, you must increase the number of CPU cores of TaskManager on the Resources tab in the console of fully managed Flink.

  • Q: What is the impact of a table schema change during full data reading?

    A: If the schema of the source table changes during full data reading, fully managed Flink ignores the schema change. As a result, the schema change is not synchronized to the destination table.

  • Q: What do I do if the data synchronization job fails due to an unsupported schema change to a table?

    You must resynchronize the data of the table. To resynchronize the data of the table, you must stop the data synchronization job, delete the destination table, and then restart the job. For more information, see Limits on CREATE TABLE AS and Limits on CREATE DATABASE AS.

References