This tutorial shows how to synchronize an entire MySQL database to StarRocks using a Flink Change Data Capture (CDC) YAML job. It covers full-database sync, table routing, and real-time incremental change replication — so you can see both the initial data load and live CDC updates reflected in the destination.
What you'll do:
-
Prepare test tables and data in RDS MySQL
-
Create a Flink CDC data ingestion job from a YAML template
-
Deploy and start the job
-
Verify the initial sync and real-time incremental changes in StarRocks
Prerequisites
Before you begin, make sure you have:
-
A Flink workspace. See Activate Realtime Compute for Apache Flink.
-
An RDS MySQL instance. See Create an RDS MySQL instance.
-
A StarRocks instance. See Procedure.
The RDS MySQL and StarRocks instances must be in the same virtual private cloud (VPC) as the Flink workspace. If they are in different VPCs, establish a network connection and configure an IP address whitelist for the RDS MySQL instance. See How do I access other services across VPCs?, How do I access the Internet?, and How do I configure a whitelist?.
Background
This tutorial uses a MySQL database named order_dw_mysql that contains three business tables: orders, orders_pay, and product_catalog. The goal is to synchronize all tables and their data to a database named order_dw_sr in StarRocks.
Step 1: Prepare RDS MySQL test data
-
On the RDS MySQL instance, create a database named
order_dw_mysqland a standard account with read and write permissions on that database. See Create databases and accounts and Manage databases. -
Log on to the RDS MySQL instance using DMS. See Log on to an RDS MySQL instance using DMS.
-
In the SQL Console, enter the following SQL and click Execute to create three tables and insert test data.
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Insert test data INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
Step 2: Create a Flink CDC data ingestion job
-
Log on to the Realtime Compute for Apache Flink console.
-
Click Console to navigate to the required workspace.
-
In the left navigation pane, choose Data Studio > Data Ingestion.
-
Click the
icon, click New from Template, select MySQL to StarRocks Data Synchronization, and then click Next.
-
Enter a Job Name, specify a Storage Location, select an Engine Version, and then click OK.
-
Replace the default YAML with the following configuration. This example synchronizes all tables in
order_dw_mysqltoorder_dw_srin StarRocks.NoteYAML jobs support only project variables. Use variables (as shown in this example) to avoid exposing plaintext passwords and other sensitive information in your job configuration. See Variable Management.
Section Parameter Required Description Example value source hostnameRequired The IP address or hostname of the MySQL database. Use the VPC address for better performance. rm-bp1rk934iidc3****.mysql.rds.aliyuncs.comsource portRequired The port of the MySQL database service. 3306source usernameRequired The username for the MySQL database. Use the account created in step 1. ${secret_values.mysqlusername}source passwordRequired The password for the MySQL database account. ${secret_values.mysqlpassword}source tablesRequired The tables to synchronize. Supports regular expressions. order_dw_mysql\.\*matches all tables in the database.order_dw_mysql\.\*source server-idRequired A numeric ID range for the database client. 5405-5415sink jdbc-urlRequired The JDBC URL for the StarRocks frontend (FE) node. Format: jdbc:mysql://ip:port. Find the Internal Network Address and Query Port on the Instance Details tab in the E-MapReduce console.jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030sink load-urlRequired The HTTP service URL for the FE node. Find the Internal Endpoint and HTTP Port on the Instance Details tab in the E-MapReduce console. fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030sink usernameRequired The username for the StarRocks connection. ${secret_values.starrocksusername}sink passwordRequired The password for the StarRocks connection. ${secret_values.starrockspassword}sink sink.buffer-flush.interval-msOptional The interval at which the internal buffer flushes data to StarRocks. Set to 5000(5 seconds) in this example to observe results sooner.5000route source-tableRequired The source tables to route. Use a regular expression to match multiple tables. order_dw_mysql\.\*route sink-tableRequired The destination for routing. Use the replace-symbolplaceholder to preserve each source table name in the destination. See Route module.order_dw_sr.<>route replace-symbolRequired The string that acts as a placeholder for each source table name during routing. <>source: type: mysql hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: order_dw_mysql.\.* server-id: 8601-8604 # (Optional) Synchronize data from newly created tables during the incremental phase. scan.binlog.newly-added-table.enabled: true # (Optional) Synchronize table and field comments. include-comments.enabled: true # (Optional) Prioritize unbounded chunks to prevent TaskManager OutOfMemory errors. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Optional) Enable parsing filters to accelerate reads. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030 load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030 username: ${secret_values.starrocksusername} password: ${secret_values.starrockspassword} table.create.properties.replication_num: 1 sink.buffer-flush.interval-ms: 5000 # Flush data every 5 seconds. route: - source-table: order_dw_mysql.\.* sink-table: order_dw_sr.<> replace-symbol: <> description: route all tables in source_db to sink_db pipeline: name: Sync MySQL Database to StarRocksReplace the parameters in the table below with your own values. For the full parameter reference, see MySQL and StarRocks.
-
Click Deploy.
Step 3: Start the Flink CDC data ingestion job
-
On the Data Ingestion page, click Deploy. In the dialog box that appears, click OK.
-
Go to Operation Center > Job O&M, find the YAML job, and click Start in the Actions column.
-
Click Start. This example uses Stateless Start. For other start options, see Start a job. After the job starts, monitor its runtime status on the Job O&M page.
Step 4: Verify the synchronization result in StarRocks
After the job enters the Running state, verify that the data has been synchronized to StarRocks.
-
Connect to the StarRocks instance using EMR StarRocks Manager.
-
In the left navigation pane, click SQL Editor. On the Databases tab, click the
icon. A database named order_dw_srappears underdefault_catalog. -
On the Query List tab, click +File to create a query script. Enter the following SQL and click Run.
SELECT * FROM default_catalog.order_dw_sr.orders ORDER BY order_id; SELECT * FROM default_catalog.order_dw_sr.orders_pay ORDER BY pay_id; SELECT * FROM default_catalog.order_dw_sr.product_catalog ORDER BY product_id; -
Check the results. The tables and data from MySQL now exist in StarRocks.

What's next
-
To learn more about developing Flink CDC data ingestion jobs, see Develop a Flink CDC data ingestion job (Public Preview).
-
For the full reference on source, sink, transform, and route modules, see Development reference for Flink CDC data ingestion jobs.