All Products
Search
Document Center

Realtime Compute for Apache Flink:Stream ingest data into data warehouse

Last Updated:Mar 26, 2026

This tutorial walks you through building a real-time Change Data Capture (CDC) ingestion pipeline from MySQL to Hologres using Realtime Compute for Apache Flink. By the end, you will have a working pipeline that handles whole-database synchronization, merges sharded tables into a single target table, and propagates schema changes (DDL evolution) in real time.

The following tasks are handled automatically — you do not need to configure them manually:

  • Switching from full synchronization to incremental synchronization

  • Discovering new tables and metadata changes

  • Propagating DDL schema changes to the downstream table

Background

This tutorial uses two sets of MySQL databases:

  • tpc_ds: one database with 24 business tables, each with a different schema

  • user_db1, user_db2, user_db3: three sharded databases, each containing three tables with identical schemas (user01–user09 across all databases)

The goal is to sync all tables from tpc_ds to Hologres and merge the nine sharded user tables into a single Hologres table named users.

The following figure shows the databases and tables in MySQL as viewed in the Data Management (DMS) console.

数据库和表情况

This tutorial uses the How Flink CDC data ingestion works feature.

Prerequisites

Before you begin, ensure that you have:

The ApsaraDB RDS for MySQL and Hologres instances must be in the same region and virtual private cloud (VPC) as your Flink workspace. If they are not, configure network connectivity. For more information, see How does Realtime Compute for Apache Flink access the Internet? and How do I access the Internet?.

Prepare MySQL test data and a Hologres database

  1. Download the following SQL test files to your local machine: tpc_ds.sql, user_db1.sql, user_db2.sql, and user_db3.sql.

  2. Load the test data into your ApsaraDB RDS for MySQL instance using DMS.

    1. Log on to your ApsaraDB RDS for MySQL instance using DMS. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.

    2. In the SQL Console window, run the following commands to create the four databases, and then click Execute. sql CREATE DATABASE tpc_ds; CREATE DATABASE user_db1; CREATE DATABASE user_db2; CREATE DATABASE user_db3;

    3. On the top shortcut menu bar, click Data Import.

    4. On the Data Import tab, select the target database, upload the corresponding SQL file, click Submit, and then click Execute Change. In the dialog box that appears, click Confirm Execution. Repeat this step for each of the four databases: tpc_ds, user_db1, user_db2, and user_db3. 导入数据

  3. In the Hologres console, create a database named my_user to store the merged user table data. For more information, see Create a database.

Configure IP whitelists

Add the CIDR block of your Flink workspace to the IP whitelists of both the ApsaraDB RDS for MySQL instance and the Hologres instance so that Flink can access them.

  1. Get the VPC CIDR block of your Flink workspace.

    1. Log on to the Realtime Compute console.

    2. In the Workspace list, find your workspace. In the Actions column, choose More > Workspace Details.

    3. In the Workspace Details dialog box, note the VPC CIDR block of the vSwitch that Flink uses. 网段信息

  2. Add the Flink CIDR block to the IP whitelist of your ApsaraDB RDS for MySQL instance. For more information, see Configure IP whitelists.

    RDS白名单

  3. Add the Flink CIDR block to the IP whitelist of your Hologres instance. When configuring a data connection in HoloWeb, set Login Method to Passwordless Login for Current User before configuring the IP whitelist. For more information, see IP whitelist.

    Holo白名单

Step 1: Develop a data ingestion job

  1. Log on to the Flink development console and create a new job.

    1. On the Data Development > Data Ingestion page, click New.

    2. Click Blank Data Ingestion Draft. Flink also provides code templates that include use cases, sample code, and usage guidance. Click any template to explore Flink features and syntax.

    3. Click Next.

    4. In the New Data Ingestion Job Draft dialog box, configure the following parameters.

      Parameter

      Description

      Example

      File Name

      Name of the job. Must be unique within the current project.

      flink-test

      Engine Version

      Flink engine version. For version details and lifecycle milestones, see Engine version overview.

      vvr-11.1-jdk11-flink-1.20

    5. Click OK.

  2. Copy the following job code into the job editor. This job synchronizes all tables from the tpc_ds database to Hologres and merges the nine sharded user tables into a single users table. The tables pattern uses regular expressions: tpc_ds\.\.* matches all tables in tpc_ds, and user_db[0-9]+.user[0-9]+ matches all sharded user tables across user_db1, user_db2, and user_db3.

    All tables from the MySQL tpc_ds database sync directly to identically named tables in Hologres — no additional route entry is required. To sync them to a differently named database, such as ods_tps_ds, add a route entry: ``yaml route: # Merge sharded user tables into the my_user.users table. - source-table: user_db[0-9]+.user[0-9]+ sink-table: my_user.users # Sync all tables from the tpc_ds database to the ods_tps_ds database. - source-table: tpc_ds.\.* sink-table: ods_tps_ds.<> replace-symbol: <> ``
    source:
      type: mysql
      name: MySQL Source
      hostname: localhost
      port: 3306
      username: username
      password: password
      tables: tpc_ds.\.*,user_db[0-9]+.user[0-9]+
      server-id: 8601-8604
      # (Optional) Synchronize table and column comments.
      include-comments.enabled: true
      # (Optional) Prioritize unbounded chunk distribution to avoid possible TaskManager out-of-memory (OOM) errors.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Enable parsing filters to speed up reading.
      scan.only.deserialize.captured.tables.changelog.enabled: true
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: ****.hologres.aliyuncs.com:80
      dbname: cdcyaml_test
      username: ${secret_values.holo-username}
      password: ${secret_values.holo-password}
      sink.type-normalize-strategy: BROADEN
    
    route:
      # Merge sharded user tables into the my_user.users table.
      - source-table: user_db[0-9]+.user[0-9]+
        sink-table: my_user.users

Step 2: Start the job

  1. On the Data Development > Data Ingestion page, click Deploy. In the dialog box that appears, click Confirm.

    部署

  2. On the Operation Center > Job Operations page, click Actions next to your job, and then click Start. For more information, see Start a job.

  3. Click Start. After the job starts, monitor its status and runtime information on the Job Operations page.

    作业状态

Step 3: Monitor full synchronization results

  1. Log on to the Hologres Management Console.

  2. On the Metadata Management tab, verify that the 24 tables and their data from the tpc_ds database appear in Hologres.

    holo表数据

  3. On the Metadata Management tab, check the schema and data of the users table in the my_user database.

    • Schema 表结构 The users table includes two extra columns: _db_name and _table_name. These columns record the source database and table name for each row, and together they form part of the composite primary key to ensure row uniqueness after the sharded tables are merged.

    • Table data In the upper-right corner of the users table details page, click Query Table, run the following query, and then click Run. ``sql select * from users order by _db_name,_table_name,id; `` The query result is shown in the following figure. 表数据

Step 4: Monitor incremental synchronization results

After full synchronization completes, the job automatically switches to incremental synchronization. No manual intervention is required.

Use the currentEmitEventTimeLag metric to determine which phase the job is in:

  • Value = 0: full synchronization is still in progress

  • Value > 0: incremental synchronization has started

To check the metric:

  1. Log on to the Realtime Compute console.

  2. Find your workspace and click Console in the Actions column.

  3. On the Operation Center > Job Operations page, click your job name.

  4. Click the Monitoring and Alerts tab (also labeled Data Curve in some versions).

  5. Check the currentEmitEventTimeLag curve to identify the synchronization phase.

    数据曲线

  6. Verify that schema and data changes sync in real time. After the job enters incremental synchronization, run the following commands one at a time in DMS. After each command, check the users table in Hologres to confirm that the change appears immediately.

    1. Log on to your ApsaraDB RDS for MySQL instance using DMS. For more information, see Log on to ApsaraDB RDS for MySQL using DMS.

    2. In the user_db2 database, run each of the following commands individually and verify the result in Hologres after each one.

      Add a column (DDL schema change):

      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;

      Go to the Hologres console, click Query Table on the users table details page, run the query below, and click Run. Confirm that the age column appears in the result.

      select * from users order by _db_name,_table_name,id;

      Insert a row with the new column:

      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

      Run the same query again and confirm that the Tony record appears. Update a row in a different sharded table:

      UPDATE `user05` SET name='JARK' WHERE id=15;

      Run the same query again and confirm that the name is updated to JARK. After all three commands, the users table in Hologres reflects the new age column (from the ALTER TABLE), the inserted Tony record, and the updated JARK record — even though the schema change was made to only one of the nine sharded tables. 表结构和数据变化

(Optional) Step 5: Configure job resources

Adjust concurrency and TaskManager resources to optimize job performance for your data volume.

  1. On the Operation Center > Job Operations page, click your job name.

  2. On the Deployment Details tab, click Resource Configuration, and then click Edit in the upper-right corner.

  3. Set resource parameters such as TaskManager memory and concurrency.

  4. Click Save in the upper-right corner of the Resource Configuration section.

  5. Restart the job. Resource configuration changes take effect only after a restart.

What's next

References