All Products
Search
Document Center

Realtime Compute for Apache Flink:Real-time database ingestion

Last Updated:Apr 28, 2026

Realtime Compute for Apache Flink simplifies real-time data ingestion by automatically handling the switch from full to incremental synchronization, metadata discovery, schema evolution, and whole-database synchronization. This topic shows you how to quickly build a data ingestion job that streams data from ApsaraDB RDS for MySQL to Hologres.

Background

The following figure shows how these databases and tables appear in the DMS console.数据库和表情况

Follow these steps to develop a data ingestion job that synchronizes all these tables to Hologres and merges the sharded user tables into a single table:

This topic uses Flink CDC data ingestion to perform whole-database synchronization and merge sharded tables. This allows you to complete full and incremental data synchronization, as well as real-time schema change synchronization, with a single job.

Prerequisites

Prepare MySQL test data and a Hologres database

  1. Click tpc_ds.sql, user_db1.sql, user_db2.sql, and user_db3.sql to download the test data files to your local machine.

  2. In the DMS console, prepare the test data in your ApsaraDB RDS for MySQL instance.

    1. Log on to your ApsaraDB RDS for MySQL instance by using DMS.

    2. In the SQL Console window, enter the following commands and click Execute.

      The following commands create four databases: tpc_ds, user_db1, user_db2, and user_db3.

      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. In the top navigation bar, click Data Import.

    4. On the Batch Data Import tab, select a database to import data into, upload the corresponding SQL file, click Submit, and then click Execute Change. In the dialog box that appears, click Confirm Execution.

      Repeat this operation to import the corresponding data files into the tpc_ds, user_db1, user_db2, and user_db3 databases.导入数据

  3. In the Hologres console, create a database named my_user to store the merged user table data.

    For more information, see Create a database.

Configure an IP whitelist

To allow Flink to access the ApsaraDB RDS for MySQL and Hologres instances, add the CIDR block of your Flink workspace to the IP whitelist of both instances.

  1. Obtain the CIDR block of the Flink workspace.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the list of workspaces, find the target workspace and choose More > Workspace Details in the Actions column.

    3. In the Workspace Details dialog box, view the CIDR Block information for the Flink vSwitch.

      网段信息

  2. Add the Flink CIDR block to the IP whitelist of the ApsaraDB RDS for MySQL instance.

    For more information, see Configure an IP whitelist.RDS白名单

  3. Add the Flink CIDR block to the IP whitelist of the Hologres instance.

    When configuring a data connection in HoloWeb, set Login Method to Password-free login for current user before you configure an IP whitelist for the connection. For more information, see IP whitelist.Holo白名单

Step 1: Develop the ingestion job

  1. Log on to the Flink development console and create a new job.

    1. On the Data Development > Data Ingestion page, click New.

    2. Click Blank Data Ingestion Draft.

      Realtime Compute for Apache Flink provides a rich set of code templates, each with specific use cases, code samples, and guidance. You can click a template to learn about product features and syntax for implementing your business logic.

    3. Click Next.

    4. In the New Data Ingestion Job Draft dialog box, configure the job parameters.

      Parameter

      Description

      Example

      File Name

      The name of the job.

      Note

      The job name must be unique within the current project.

      flink-test

      Storage Location

      The folder where the job's code file is stored.

      You can also click the 新建文件夹 icon next to an existing folder to create a subfolder.

      Job Drafts

      Engine Version

      The Flink engine version used by the job. For information about engine version numbers, version compatibility, and important lifecycle dates, see Engine Versions.

      vvr-11.1-jdk11-flink-1.20

    5. Click OK.

  2. Copy the following job code into the job editor.

    The following code synchronizes all tables from the tpc_ds database to Hologres and merges the sharded user tables into a single table in Hologres:

    source:
      type: mysql
      name: MySQL Source
      hostname: localhost
      port: 3306
      username: username
      password: password
      tables: tpc_ds.\.*,user_db[0-9]+.user[0-9]+
      server-id: 8601-8604
      # (Optional) Synchronize table and column comments.
      include-comments.enabled: true
      # (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory errors.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Enable parsing filters to accelerate reading.
      scan.only.deserialize.captured.tables.changelog.enabled: true  
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: ****.hologres.aliyuncs.com:80
      dbname: cdcyaml_test
      username: ${secret_values.holo-username}
      password: ${secret_values.holo-password}
      sink.type-normalize-strategy: BROADEN
      
    route:
      # Merge and synchronize the sharded user tables to the my_user.users table.
      - source-table: user_db[0-9]+.user[0-9]+
        sink-table: my_user.users
    Note

    Tables from the MySQL tpc_ds database are mapped directly to identically named tables in the destination, so the route section requires no extra mapping configuration. To synchronize the tables to a database with a different name, such as ods_tps_ds, configure the route module as follows:

    route:
      # Merge and synchronize the sharded user tables to the my_user.users table.
      - source-table: user_db[0-9]+.user[0-9]+
        sink-table: my_user.users
      # Rename the database for all tables under tpc_ds and synchronize them to ods_tps_ds.
      - source-table: tpc_ds.\.*
        sink-table: ods_tps_ds.<>
        replace-symbol: <>

Step 2: Start the job

  1. On the Data Development > Data Ingestion page, click Deploy. In the dialog box that appears, click Confirm.部署

  2. On the Operation Center > Job Operations page, click Start in the Actions column for the target job. Configure the parameters as required. For more information, see Start a job.

  3. Click Start.

    After the job starts, you can view its runtime information and status on the Job Operations page.作业状态

Step 3: Verify full synchronization

  1. Log on to the Hologres management console.

  2. On the Metadata Management tab, verify that the 24 tables and their data are present in the tpc_ds database of the Hologres instance.

    holo表数据

  3. On the Metadata Management tab, check the schema of the users table in the my_user database.

    The following figures show the synchronized table schema and data.

    • Table schema表结构

      The users table schema includes two additional columns not found in the source MySQL tables: _db_name and _table_name. These columns indicate the source database and table for each row and are part of the composite primary key, ensuring data uniqueness after the sharded tables are merged.

    • Table data

      In the upper-right corner of the users table information page, click Query Table. Enter the following command, and then click Run.

      select * from users order by _db_name,_table_name,id;

      The query result is shown in the following figure.表数据

Step 4: Verify incremental synchronization

After full synchronization completes, the job automatically switches to the incremental synchronization phase without manual intervention. You can check the currentEmitEventTimeLag value on the Monitoring and Alerts tab to determine the data synchronization phase.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console in the Actions column for the target workspace.

  3. On the Operation Center > Job Operations page, click the name of the target job.

  4. Click the Monitoring and Alerts (or Metrics) tab.

  5. Examine the currentEmitEventTimeLag chart to determine the data synchronization phase.

    数据曲线

    • A value of 0 indicates the full synchronization phase.

    • A value greater than 0 indicates the incremental synchronization phase.

  6. Verify real-time data and schema change synchronization.

    The MySQL CDC source supports real-time data and schema synchronization during the incremental phase. To verify this, modify the schema and data of a sharded user table in MySQL after the job enters this phase.

    1. Log on to the ApsaraDB RDS for MySQL instance by using DMS.

    2. In the user_db2 database, execute the following commands to modify the schema of the user02 table and to insert and update data.

      USE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- Add the age column.
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- Insert a row that includes the age data.
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- Update another table and change the name to uppercase.
    3. In the Hologres console, check the changes to the schema and data of the users table.

      In the upper-right corner of the users table information page, click Query Table, enter the following command, and then click Run.

      select * from users order by _db_name,_table_name,id;

      The following figure shows the query result. The schema change on user02 and the data modifications are propagated in real time, even though the sharded tables have different schemas. The Hologres users table now shows the new age column, the inserted record for Tony, and the updated record for JARK.表结构和数据变化

(Optional) Step 5: Configure job resources

For better performance, you can adjust job resources such as concurrency, TaskManager memory, and CUs based on your data volume.

  1. On the Operation Center > Job Operations page, click the name of the target job.

  2. On the Deployment Details tab, click Edit in the upper-right corner of the Resource Configuration section.

  3. Manually set resource parameters such as TaskManager memory and concurrency.

  4. On the right side of the Resource Configuration section, click Save.

  5. Restart the job.

    Resource configuration changes take effect only after you restart the job.

Related documents