This topic describes how to synchronize data from MySQL to StarRocks in E-MapReduce using the Ververica Platform (VVP) of Alibaba Cloud Realtime Compute for Flink.
Prerequisites
You have activated fully managed Flink for Alibaba Cloud Realtime Compute. For more information, see Activate fully managed Flink.
You have created a StarRocks cluster. For more information, see Create a StarRocks cluster.
NoteSet the number of core instances to 3.
You have created an ApsaraDB RDS for MySQL instance. For more information, see Create an ApsaraDB RDS for MySQL instance.
NoteThis topic uses MySQL 5.7 as an example.
Limits
The ApsaraDB RDS for MySQL instance must be version 5.7 or later.
The VVP workspace, StarRocks cluster, and ApsaraDB RDS for MySQL instance must be in the same VPC and zone.
The StarRocks cluster must be EMR-3.42.0 or later.
The Flink engine must be vvr-4.0.11-flink-1.13 or later.
Notes
If you modify an ApsaraDB RDS table using the ALTER TABLE statement, you must manually synchronize the schema changes to StarRocks. If you create a new table in ApsaraDB RDS, you must run the data synchronization job again.
Procedure
Step 1: Prepare test data
Create a test database and account. For more information, see Create a database and an account.
After you create the database and account, grant read and write permissions to the test account.
NoteIn this topic, the database is named `test_cdc` and the account is named `emr_test`.
Use the test account to connect to the MySQL instance. For more information, see Log on to an ApsaraDB RDS for MySQL instance using DMS.
Execute the following command to create a table.
/* Statement to create a table in MySQL */ CREATE TABLE test_cdc.`runoob_tbl` ( `runoob_id` int unsigned NOT NULL AUTO_INCREMENT, `runoob_title` varchar(100) NOT NULL, `runoob_author` varchar(40) NOT NULL, `submission_date` date DEFAULT NULL, `add_col` int DEFAULT NULL, PRIMARY KEY (`runoob_id`) ) ENGINE=InnoDB INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2025-06-22 17:13:44',3)On the Security page of the ApsaraDB RDS console, add the CIDR block of Flink to the whitelist. For more information, see Step 2 in Connect to an ApsaraDB RDS for MySQL instance from a client or the command line.
In the Realtime Compute console, you can find the Flink CIDR block by clicking Details in the Actions column of the target workspace.
Log on to the StarRocks cluster using the Secure Shell (SSH) protocol. For more information, see Log on to a cluster.
Execute the following command to connect to the StarRocks cluster.
mysql -h127.0.0.1 -P 9030 -urootExecute the following commands to create a user, grant permissions, and create a table.
/* Statement to create a table in StarRocks */ CREATE USER 'test' IDENTIFIED by '123456'; CREATE DATABASE test_cdc; GRANT ALL on test_cdc to test; GRANT ALL ON ALL TABLES IN DATABASE test_cdc to test; use test_cdc; CREATE TABLE `runoob_tbl1` ( `runoob_id` bigint(20) NOT NULL COMMENT "", `runoob_title` varchar(100) NOT NULL COMMENT "", `runoob_author` varchar(40) NOT NULL COMMENT "", `submission_date` date NULL COMMENT "", `add_col` int(11) NULL COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`runoob_id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`runoob_id`) BUCKETS 8;
Step 2: Create a MySQL catalog using VVP
On the Data Management page of the Realtime Compute console, click Create Catalog.
In the Create Catalog dialog box, on the Built-in Catalog tab, select MySQL Catalog and click Next.
The following code provides an example of the MySQL catalog configuration.
NoteThe parameter settings are for reference only. Configure the parameters based on your requirements.
CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '******', 'default-database' = 'test_cdc' );Parameter
Description
type
The type. The value is fixed to `mysql`.
hostname
The internal endpoint of the ApsaraDB RDS instance. You can go to the Database Connection page of the ApsaraDB RDS instance and click the internal endpoint to copy it. Example: `rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com`.
port
The port number of the MySQL database service. The default value is 3306.
username
The username of the MySQL database service.
Enter the username of the account that you created in Step 1: Prepare test data. This topic uses `emr_test` as an example.
password
The password of the MySQL database service.
Enter the password of the account that you created in Step 1: Prepare test data.
default-database
The default MySQL database name.
Enter the name of the database that you created in Step 1: Prepare test data. This topic uses `test_cdc` as an example.
Click OK.
Step 3: Create a StarRocks sink table using VVP
On the page of the Realtime Compute console, create a streaming job.
In the Create File dialog box, set Engine Version to vvr-8.0.11-flink-1.17 and click Create.
Copy the following code to the job editor.
CREATE TEMPORARY TABLE sr_result ( runoob_id BIGINT, runoob_title VARCHAR, runoob_author VARCHAr, submission_date date, add_col int, PRIMARY KEY (runoob_id) NOT ENFORCED ) with ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030', 'load-url' = '192.168.**.**:8030', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl1', 'username' = 'emr_test', 'password' = '******', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ); INSERT INTO sr_result SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col from mysql.test_cdc.`runoob_tbl`;Parameter
Description
connector
The value is fixed to `starrocks`.
jdbc-url
Used to execute queries in StarRocks.
Example: `jdbc:mysql://192.168.**.**:9030`. In the example, `192.168.**.**` is the internal IP address of the StarRocks cluster.
load-url
Specify the IP address and HTTP port of the FE in the format
Internal IP address of the StarRocks cluster:Port. In this example, port 8030 is used. Select a port based on the version of your cluster:18030: EMR-5.9.0 or later, or EMR-3.43.0 or later.
8030: EMR-5.8.0 or earlier, or EMR-3.42.0 or earlier.
NoteFor more information about access ports, see UI and ports.
database-name
The name of the database in StarRocks.
Enter the name of the database that you created in Step 1: Prepare test data. This topic uses `test_cdc` as an example.
table-name
The name of the table in StarRocks.
Enter the name of the table that you created in Step 1: Prepare test data. This topic uses `runoob_tbl1` as an example.
username
The username for StarRocks.
Enter the username that you created in Step 1: Prepare test data. This topic uses `test` as an example.
password
The password for StarRocks.
Enter the password that you set in Step 1: Prepare test data. This topic uses `123456` as an example.
sink.buffer-flush.interval-ms
The buffer flush interval. The value must be in the range of 1000 ms to 3600000 ms.
sink.properties.row_delimiter
A custom row delimiter.
sink.properties.column_separator
A custom column delimiter.
ImportantIf you set sink.semantic to `exactly-once`, you must use it with checkpoints. The checkpoint interval must not be excessively long. Data becomes visible only after a checkpoint completes. While checkpointing, data is stored in Flink memory.
By default, data is imported in CSV format. You can specify
'sink.properties.row_delimiter' = '\\x02'and'sink.properties.column_separator' = '\\x01'to customize the row and column delimiters. The `sink.properties.row_delimiter` parameter is supported in StarRocks 1.15.0 and later.
Click Deep Check to check the syntax.
After the syntax is verified, click Deploy.
Step 4: Start the job using VVP
In the navigation pane of the Realtime Compute console, choose .
On the Job O&M page, click Start in the Actions column of the target job.
In the dialog box that appears, click Start.
When the job status changes to Running, it indicates that the job is running as expected and you can import data.
Step 5: Run a demo
Query data
Log on to the StarRocks cluster using SSH. For more information, see Log on to a cluster.
Execute the following command to connect to the StarRocks cluster.
mysql -h127.0.0.1 -P 9030 -urootIn the StarRocks connection window, execute the following commands to view the table data.
use test_cdc; select * from runoob_tbl1;The following output indicates that the data from MySQL has been synchronized to StarRocks.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Query data after an insert operation
In the ApsaraDB RDS database window, execute the following command to insert data.
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2025-06-23',1)In the StarRocks connection window, execute the following commands to view the table data.
select * from runoob_tbl1;The following output indicates that the data has been inserted.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2025-06-23 | 1 | | 18 | first | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Synchronize data updates
In the ApsaraDB RDS database window, execute the following command to update the specified data.
update runoob_tbl set runoob_title= 'new' where runoob_id = 18You can execute the following commands in the StarRocks connection window to view the table data.
select * from runoob_tbl1;The following output shows that the data is updated.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2025-06-23 | 1 | | 18 | new | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Synchronize data deletions
In the ApsaraDB RDS database window, execute the following command to delete the data.
DELETE FROM runoob_tbl WHERE runoob_id = 1In the StarRocks connection window, execute the following commands to view the table data.
select * from runoob_tbl1;The following output confirms that the data has been deleted.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Data type mapping between Flink and StarRocks
Flink data type | StarRocks data type |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY\<T> | ARRAY\<T> |
MAP\<KT,VT> | JSON STRING |
ROW\<arg T...> | JSON STRING |
FAQ
Q: What should I do if a time zone inconsistency occurs when I import data into StarRocks?
A: You can resolve this by adding a time zone configuration as a hint in the INSERT INTO statement. The following code provides an example.
INSERT INTO sr_result SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col from mysql.test_cdc.`runoob_tbl` /*+ OPTIONS('server-time-zone'='Asia/Shanghai') */;