Realtime Compute for Apache Flink uses Flink Change Data Capture (CDC) to ingest data from a source to a destination. You can develop YAML jobs to synchronize data. This topic describes how to create a Flink CDC data ingestion job to synchronize all data from a MySQL database to StarRocks.
Prerequisites
A Flink workspace is created. For more information, see Activate Realtime Compute for Apache Flink.
Upstream and downstream storage
An RDS MySQL instance is created. For more information, see Create an RDS MySQL instance.
A StarRocks instance is created. For more information, see Procedure.
NoteThe RDS MySQL and StarRocks instances must be in the same virtual private cloud (VPC) as the Flink workspace. Otherwise, you must establish a network connection and configure an IP address whitelist for the RDS MySQL instance. For more information, see How do I access other services across VPCs?, How do I access the Internet?, and How do I configure a whitelist?.
Background information
Assume that a MySQL instance has a database named order_dw_mysql. This database contains three business tables: orders, orders_pay, and product_catalog. To develop a Flink CDC data ingestion job that synchronizes these tables and their data to the order_dw_sr database in StarRocks, follow these steps:
Step 1: Prepare RDS MySQL test data
Create a database and an account.
On the RDS MySQL instance, create a database named order_dw_mysql and a standard account that has read and write permission on the database. For more information, see Create databases and accounts and Manage databases.
Log on to the RDS MySQL instance using DMS.
For more information, see Log on to an RDS MySQL instance using DMS.
In the SQL Console, enter the following commands and click Execute to create three business tables and insert data.
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Prepare data INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
Step 2: Develop a Flink CDC data ingestion job
Log on to the Realtime Compute for Apache Flink console.
Click Console to navigate to the required workspace.
In the navigation pane on the left, choose .
Click the
icon, click New from Template, select MySQL to StarRocks Data Synchronization, and then click Next.
Enter a Job Name, specify a Storage Location, select an Engine Version, and then click OK.
Configure the YAML job code.
The following code provides an example of how to synchronize all tables from the order_dw_mysql database in MySQL to the order_dw_sr database in StarRocks.
source: type: mysql hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: order_dw_mysql.\.* server-id: 8601-8604 # (Optional) Synchronize data from newly created tables during the incremental phase. scan.binlog.newly-added-table.enabled: true # (Optional) Synchronize table and field comments. include-comments.enabled: true # (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory errors. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Optional) Enable parsing filters to accelerate reads. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030 load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030 username: ${secret_values.starrocksusername} password: ${secret_values.starrockspassword} table.create.properties.replication_num: 1 sink.buffer-flush.interval-ms: 5000 # Flush data every 5 seconds. route: - source-table: order_dw_mysql.\.* sink-table: order_dw_sr.<> replace-symbol: <> description: route all tables in source_db to sink_db pipeline: name: Sync MySQL Database to StarRocksThe following table describes the configuration information required for this example. For more information about data ingestion parameters, see MySQL and StarRocks.
NoteYAML jobs support only project variables. You can use variables to avoid exposing plaintext passwords and other sensitive information. For more information, see Variable Management.
Category
Parameter
Description
Example value
source
hostname
The IP address or hostname of the MySQL database.
We recommend that you use a VPC address.
rm-bp1rk934iidc3****.mysql.rds.aliyuncs.comport
The port number of the MySQL database service.
3306
username
The username and password for the MySQL database service. Use the account information that you created in Step 1: Prepare RDS MySQL test data.
${secret_values.mysqlusername}password
${secret_values.mysqlpassword}tables
The names of the MySQL tables. Regular expressions are supported to read data from multiple tables.
This topic synchronizes all tables and data in the order_dw_mysql database.
order_dw_mysql.\.*
server-id
A numeric ID for the database client.
5405-5415
sink
jdbc-url
The Java Database Connectivity (JDBC) URL.
Specify the IP address and query port of the frontend (FE) in the
jdbc:mysql://ip:portformat.On the Instance Details tab in the E-MapReduce console, you can view the FE Internal Network Address and Query Port of the target instance.
jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030load-url
The HTTP service URL used to connect to the FE node.
You can view the FE Internal Endpoint and HTTP Port of the target instance on the Instance Details tab in the E-MapReduce console.
fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030username
The username and password for the StarRocks connection.
Use the username and password that you specified when you created the StarRocks instance.
NoteThis example uses variables to avoid exposing plaintext passwords and other sensitive information. For more information, see Variable Management.
${secret_values.starrocksusername}password
${secret_values.starrockspassword}sink.buffer-flush.interval-ms
The refresh interval for the internal buffer.
Because the data volume in this example is small, a short interval (5 seconds) is set to observe the results sooner.
5000
route
source-table
Specifies the ancestor tables to be routed.
You can use a regular expression to match multiple tables. For example,
order_dw_mysql.\.*routes all tables in theorder_dw_mysqldatabase.order_dw_mysql.\.*
sink-table
Specifies the destination for data routing.
You can use the symbol from
replace-symbolas a placeholder for each ancestor table name to implement many-to-many routing.For more information about routing rules, see Route module.
order_dw_sr.<>
replace-symbol
The string that represents the ancestor table name when you use the pattern matching feature.
<>
Click Deploy.
Step 3: Start the Flink CDC data ingestion job
On the Data Ingestion page, click Deploy. In the dialog box that appears, click OK.
On the page, find the target YAML job and click Start in the Actions column.
Click Start.
In this example, Stateless Start is selected. For more information about parameter settings, see Start a job. After the job starts, you can monitor its runtime information and status on the Job O&M page.
Step 4: View the synchronization result in StarRocks
After the YAML job enters the Running state, you can view the data synchronization results in StarRocks.
Connect to the StarRocks instance using EMR StarRocks Manager.
In the navigation pane on the left, click SQL Editor. On the Databases tab, click the
icon.A database named order_dw_sr appears under default_catalog.
On the Query List tab, click +File to create a query script. Then, enter the following SQL statements and click Run.
SELECT * FROM default_catalog.order_dw_sr.orders order by order_id; SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id; SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;View the synchronization results below the commands.
You can see that the tables and data from the MySQL database now exist in StarRocks.

References
For more information about how to develop a Flink CDC data ingestion job, see Develop a Flink CDC data ingestion job (Public Preview).
For more information about the source, sink, transform, and route modules in Flink CDC data ingestion jobs, see Development reference for Flink CDC data ingestion jobs.