All Products
Search
Document Center

Realtime Compute for Apache Flink:Ingest data into data warehouses in real time

Last Updated:Nov 14, 2025

Realtime Compute for Apache Flink allows you to ingest data into data warehouses in real time. It features seamless unified and incremental data synchronization, automatic metadata discovery, schema changes replication, and database synchronization, steamlining real-time data ingestion into data warehouses. This topic describes how to create a job that synchronizes data from ApsaraDB RDS for MySQL to Hologres using Realtime Compute for Apache Flink.

Background

Consider an ApsaraDB RDS for MySQL instance with four databases: tpc_ds, user_db1, user_db2, and user_db3. The tpc_ds database contains 24 tables with varying structures. Each of the user_db1, user_db2, and user_db3 databases contains three tables, all sharing the same structure (referred to as user01 through user09). In the Database Management Service (DMS) console console, the instance is represented as follows:数据库和表情况

This topic illustrates how to use Flink CDC for data ingestion, covering the synchronization of an entire database, distributed data sources, full and incremental data, and schema changes. For details, see Flink CDC data ingestion job development (public preview).

Prerequisites

Prepare test data

  1. Click tpc_ds.sql, user_db1.sql, user_db2.sql, and user_db3.sql to download test data.

  2. In the DMS console, upload the test data to your ApsaraDB RDS for MySQL instance.

    1. Log on to an ApsaraDB RDS for MySQL instance from the DMS console.

    2. On the SQLConsole tab, enter the following commands and click Execute.

      The following commands are used to create the tpc_ds, user_db1, user_db2, and user_db3 databases.

      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. Click Data Import. On the page that appears, click the Large Data Import tab.

    4. On the Large Data Import tab, select the target database, select a destination database and upload the corresponding SQL file. Click Submit and then Execute Change. In the dialog box, click Confirm Execution.

      Repeat this step to import data files to the tpc_ds, user_db1, user_db2, and user_db3 databases.导入数据

  3. Log on to the Hologres console and create the my_user database to receive data.

    For more information, see Create a database.

Configure IP address whitelists

To allow Realtime Compute for Apache Flink to access ApsaraDB RDS for MySQL and Hologres instances, add the CIDR block of the vSwitch to which the Realtime Compute for Apache Flink workspace belongs to the whitelists of ApsaraDB RDS for MySQL and Hologres.

  1. Obtain the CIDR block of the vSwitch to which the Realtime Compute for Apache Flink workspace belongs.

    1. Log on to the management console of Realtime Compute for Apache Flink.

    2. Choose Details in the Actions column.

    3. View the CIDR block of the vSwitch.

      网段信息

  2. Add the CIDR block of the vSwitch to the IP address whitelist of the ApsaraDB RDS for MySQL instance.

    For more information, see Configure an IP address whitelist.RDS白名单

  3. Add the CIDR block of the vSwitch to the IP address whitelist of the Hologres instance.

    To configure an IP address whitelist for an instance in the HoloWeb console, set the Logon Method parameter to Password-free Logon when you set up a connection to the instance. For more information, see Configure an IP address whitelist.Holo白名单

Step 1: Develop a data synchronization job

  1. Log on to Realtime Compute for Apache Flink's Development Console.

    1. In the left navigation menu, choose Development > Data Ingestion.

    2. Click the + icon.

    3. Select New Draft.

    4. In the New Draft dialog box, enter a name and choose the engine version:

      Parameter

      Description

      Example

      Name

      The name of the draft to create.

      Note

      It must be unique in the current namespace.

      flink-test

      Engine Version

      Choose an engine version for your job. For more information, see Engine version.

      vvr-11.1-jdk11-flink-1.20

    5. Click Create.

  2. Copy the following code of a draft to the code editor.

    The following sample code shows how to synchronize all tables in the tpc_ds database of ApsaraDB RDS for MySQL to the tpc_ds database of Hologres, and then merge and synchronize tables user01 through user09 to the my_user.users table of Hologres. Sample code:

    source:
      type: mysql
      name: MySQL Source
      hostname: localhost
      port: 3306
      username: username
      password: password
      tables: tpc_ds.\.*,user_db[0-9]+.user[0-9]+
      server-id: 8601-8604
      # (optional) Sync comments
      include-comments.enabled: true
      # (optional) Distribute unbounded chunks first to avoid TM OOMs
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (optional) Deserialize the changelog events for tables explicitly captured
      scan.only.deserialize.captured.tables.changelog.enabled: true  
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: ****.hologres.aliyuncs.com:80
      dbname: cdcyaml_test
      username: ${secret_values.holo-username}
      password: ${secret_values.holo-password}
      sink.type-normalize-strategy: BROADEN
      
    route:
      # Merge multiple user tables and sync data to a sink
      - source-table: user_db[0-9]+.user[0-9]+
        sink-table: my_user.users
    Note

    The above code snippet syncs all tables from the tpc_ds database to downstream database and tables with the same names. To specify a different destination (such as the ods_tps_ds database), configure the route module as follows:

    route:
      # Sync data from user tables to a destination
      - source-table: user_db[0-9]+.user[0-9]+
        sink-table: my_user.users
      # Sync data from tpc_ds to ods_tps_ds
      - source-table: tpc_ds.\.*
        sink-table: ods_tps_ds.<>
        replace-symbol: <>

Step 2: Start the job

  1. Click Deploy. In the dialog box, click Confirm.部署

  2. In the left navigation menu, choose O&M > Deployments. On the Deployments page, find your job deployment and click Start in the Actions column. For starup configurations, see Start a job deployment.

  3. In the dialog box, click Start.

    View the status and information on the Deployments page.作业状态

Step 3: View the full data synchronization result

  1. Log on to the Hologres console.

  2. In the left navigation pane, click Go to HoloWeb. On the Metadata Management page of HoloWeb, view the 24 tables and table data in the tpc_ds database of the Hologres instance.

    holo表数据

  3. On the Metadata Management page, view the schema of the users table in the my_user database.

    The following figure shows the table schema and data after full data synchronization.

    • Table schema表结构

      The _db_name and _table_name columns are source database and table names. They are used as part of the joint primary key to ensure that data is unique after tables are merged.

    • Table data

      In the upper-right corner of the users tab, click Query table. In the SQL editor, enter the following command and click Run:

      select * from users order by _db_name,_table_name,id;

      The following figure shows the table data.表数据

Step 4: View the incremental synchronization result

After full data synchronization is complete, Flink seamlessly switches to incremental synchronization. You can determine the data synchronization phase of a data synchronization deployment based on the value of currentEmitEventTimeLag at a specific point in time on the Alarm tab in Realtime Compute for Apache Flink's Development Console.

  1. Log on to the Management Console of Realtime Compute for Apache Flink.

  2. Find your Flink workspace and click Console in the Actions column.

  3. In the left navigation menu, choose O&M > Deployments. On the Deployments page, click the name of your job deployment.

  4. Click the Alarm tab.

  5. View the chart of currentEmitEventTimeLag to determine the data synchronization phase.

    数据曲线

    • currentEmitEventTimeLag is 0: the job runs in the full data synchronization phase.

    • currentEmitEventTimeLag is greater than 0: the job has entered the incremental synchronization phase.

  6. Verify the real-time synchronization of data changes and schema changes.

    Flink CDC enables replication of data and schema changes during the incremental phase. To verify these capabilities, modify the table schema and data in source tables user01 to user09.

    1. Log on to an ApsaraDB RDS for MySQL instance by using the DMS console.

    2. In the user_db2 database, run the following commands to modify the schema of the user02 table, insert data into the user02 table, and update data in the user05 table:

      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- Add the age column to the user02 table. 
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- Insert data that includes the age information into the user02 table. 
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- Change a specific value of the name field to uppercase letters.
    3. In the Hologres console, view the changes in the schema and data of the users table.

      In the upper-right corner of the users tab, click Query table. In the SQL editor, enter the following command and click Run:

      select * from users order by _db_name,_table_name,id;

      The following figure shows the table data. 表结构和数据变化This indicates data and schema changes to the user02 table are replicated to Hologres.

(Optional) Step 5: Configure resource settings

To ensure optimal job performance, adjust job parallelism and TaskManager resources based on your data size. Procedure:

  1. In the left navigation menu, choose O&M > Deployments. On the Deployments page, click the name of your job development.

  2. Click the Configuration tab. In the Resources section, click Edit.

  3. Set Task Manager Memory, Parallelism, and other parameters as needed.

  4. Click Save.

  5. Restart the job deployment.

    This action will apply resource settings.

References