Realtime Compute for Apache Flink allows you to ingest data into data warehouses in real time. It features seamless unified and incremental data synchronization, automatic metadata discovery, schema changes replication, and database synchronization, steamlining real-time data ingestion into data warehouses. This topic describes how to create a job that synchronizes data from ApsaraDB RDS for MySQL to Hologres using Realtime Compute for Apache Flink.
Background
Consider an ApsaraDB RDS for MySQL instance with four databases: tpc_ds, user_db1, user_db2, and user_db3. The tpc_ds database contains 24 tables with varying structures. Each of the user_db1, user_db2, and user_db3 databases contains three tables, all sharing the same structure (referred to as user01 through user09). In the Database Management Service (DMS) console console, the instance is represented as follows:
To sync data from multiple user tables to a Hologres table, do the following:
This topic illustrates how to use Flink CDC for data ingestion, covering the synchronization of an entire database, distributed data sources, full and incremental data, and schema changes. For details, see Flink CDC data ingestion job development (public preview).
Prerequisites
To access Flink as a Resource Access Management (RAM) identity, ensure you have the required permissions. See Permission management.
You have created a Realtime Compute for Apache Flink workspace. See Create a workspace.
You have created upstream and downstream storage systems.
NoteEnsure your ApsaraDB RDS for MySQL instance, Hologres instance, and Flink workspace reside in the same virtual private cloud (VPC). To enable internet access, see How does Realtime Compute for Apache Flink access the Internet?.
You have prepared test data and configured the IP address whitelists of MySQL and Hologres. For more information, see the Prepare test data and Configure IP address whitelists.
Prepare test data
Click tpc_ds.sql, user_db1.sql, user_db2.sql, and user_db3.sql to download test data.
In the DMS console, upload the test data to your ApsaraDB RDS for MySQL instance.
Log on to an ApsaraDB RDS for MySQL instance from the DMS console.
For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
On the SQLConsole tab, enter the following commands and click Execute.
The following commands are used to create the
tpc_ds,user_db1,user_db2, anduser_db3databases.CREATE DATABASE tpc_ds; CREATE DATABASE user_db1; CREATE DATABASE user_db2; CREATE DATABASE user_db3;Click Data Import. On the page that appears, click the Large Data Import tab.
On the Large Data Import tab, select the target database, select a destination database and upload the corresponding SQL file. Click Submit and then Execute Change. In the dialog box, click Confirm Execution.
Repeat this step to import data files to the
tpc_ds,user_db1,user_db2, anduser_db3databases.
Log on to the Hologres console and create the
my_userdatabase to receive data.For more information, see Create a database.
Configure IP address whitelists
To allow Realtime Compute for Apache Flink to access ApsaraDB RDS for MySQL and Hologres instances, add the CIDR block of the vSwitch to which the Realtime Compute for Apache Flink workspace belongs to the whitelists of ApsaraDB RDS for MySQL and Hologres.
Obtain the CIDR block of the vSwitch to which the Realtime Compute for Apache Flink workspace belongs.
Log on to the management console of Realtime Compute for Apache Flink.
Choose in the Actions column.
View the CIDR block of the vSwitch.

Add the CIDR block of the vSwitch to the IP address whitelist of the ApsaraDB RDS for MySQL instance.
For more information, see Configure an IP address whitelist.
Add the CIDR block of the vSwitch to the IP address whitelist of the Hologres instance.
To configure an IP address whitelist for an instance in the HoloWeb console, set the Logon Method parameter to Password-free Logon when you set up a connection to the instance. For more information, see Configure an IP address whitelist.
Step 1: Develop a data synchronization job
Log on to Realtime Compute for Apache Flink's Development Console.
In the left navigation menu, choose .
Click the + icon.
Select New Draft.
In the New Draft dialog box, enter a name and choose the engine version:
Parameter
Description
Example
Name
The name of the draft to create.
NoteIt must be unique in the current namespace.
flink-test
Engine Version
Choose an engine version for your job. For more information, see Engine version.
vvr-11.1-jdk11-flink-1.20
Click Create.
Copy the following code of a draft to the code editor.
The following sample code shows how to synchronize all tables in the
tpc_dsdatabase of ApsaraDB RDS for MySQL to thetpc_dsdatabase of Hologres, and then merge and synchronize tablesuser01throughuser09to themy_user.userstable of Hologres. Sample code:source: type: mysql name: MySQL Source hostname: localhost port: 3306 username: username password: password tables: tpc_ds.\.*,user_db[0-9]+.user[0-9]+ server-id: 8601-8604 # (optional) Sync comments include-comments.enabled: true # (optional) Distribute unbounded chunks first to avoid TM OOMs scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (optional) Deserialize the changelog events for tables explicitly captured scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: hologres name: Hologres Sink endpoint: ****.hologres.aliyuncs.com:80 dbname: cdcyaml_test username: ${secret_values.holo-username} password: ${secret_values.holo-password} sink.type-normalize-strategy: BROADEN route: # Merge multiple user tables and sync data to a sink - source-table: user_db[0-9]+.user[0-9]+ sink-table: my_user.usersNoteThe above code snippet syncs all tables from the
tpc_dsdatabase to downstream database and tables with the same names. To specify a different destination (such as theods_tps_dsdatabase), configure the route module as follows:route: # Sync data from user tables to a destination - source-table: user_db[0-9]+.user[0-9]+ sink-table: my_user.users # Sync data from tpc_ds to ods_tps_ds - source-table: tpc_ds.\.* sink-table: ods_tps_ds.<> replace-symbol: <>
Step 2: Start the job
Click Deploy. In the dialog box, click Confirm.

In the left navigation menu, choose . On the Deployments page, find your job deployment and click Start in the Actions column. For starup configurations, see Start a job deployment.
In the dialog box, click Start.
View the status and information on the Deployments page.
Step 3: View the full data synchronization result
Log on to the Hologres console.
In the left navigation pane, click Go to HoloWeb. On the Metadata Management page of HoloWeb, view the 24 tables and table data in the
tpc_dsdatabase of the Hologres instance.
On the Metadata Management page, view the schema of the
userstable in themy_userdatabase.The following figure shows the table schema and data after full data synchronization.
Table schema

The
_db_nameand_table_namecolumns are source database and table names. They are used as part of the joint primary key to ensure that data is unique after tables are merged.Table data
In the upper-right corner of the users tab, click Query table. In the SQL editor, enter the following command and click Run:
select * from users order by _db_name,_table_name,id;The following figure shows the table data.
Step 4: View the incremental synchronization result
After full data synchronization is complete, Flink seamlessly switches to incremental synchronization. You can determine the data synchronization phase of a data synchronization deployment based on the value of currentEmitEventTimeLag at a specific point in time on the Alarm tab in Realtime Compute for Apache Flink's Development Console.
Log on to the Management Console of Realtime Compute for Apache Flink.
Find your Flink workspace and click Console in the Actions column.
In the left navigation menu, choose . On the Deployments page, click the name of your job deployment.
Click the Alarm tab.
View the chart of currentEmitEventTimeLag to determine the data synchronization phase.

currentEmitEventTimeLag is 0: the job runs in the full data synchronization phase.
currentEmitEventTimeLag is greater than 0: the job has entered the incremental synchronization phase.
Verify the real-time synchronization of data changes and schema changes.
Flink CDC enables replication of data and schema changes during the incremental phase. To verify these capabilities, modify the table schema and data in source tables
user01touser09.Log on to an ApsaraDB RDS for MySQL instance by using the DMS console.
In the
user_db2database, run the following commands to modify the schema of theuser02table, insert data into theuser02table, and update data in theuser05table:USE DATABASE `user_db2`; ALTER TABLE `user02` ADD COLUMN `age` INT; -- Add the age column to the user02 table. INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- Insert data that includes the age information into the user02 table. UPDATE `user05` SET name='JARK' WHERE id=15; -- Change a specific value of the name field to uppercase letters.In the Hologres console, view the changes in the schema and data of the users table.
In the upper-right corner of the users tab, click Query table. In the SQL editor, enter the following command and click Run:
select * from users order by _db_name,_table_name,id;The following figure shows the table data.
This indicates data and schema changes to the user02table are replicated to Hologres.
(Optional) Step 5: Configure resource settings
To ensure optimal job performance, adjust job parallelism and TaskManager resources based on your data size. Procedure:
In the left navigation menu, choose . On the Deployments page, click the name of your job development.
Click the Configuration tab. In the Resources section, click Edit.
Set Task Manager Memory, Parallelism, and other parameters as needed.
Click Save.
Restart the job deployment.
This action will apply resource settings.