This topic describes how to use the Flink service in an EMR Dataflow cluster to synchronize data from MySQL to EMR Serverless StarRocks by using the CREATE TABLE AS (CTAS) statement.
Background information
You can use the CTAS or CREATE DATABASE AS (CDAS) statement to synchronize data from MySQL to EMR Serverless StarRocks. The CTAS statement can synchronize the schema and data of a single table, and the CDAS statement can synchronize the schema and data of an entire database or multiple tables in the same database. This topic uses the CTAS statement. The CDAS statement is used in a similar way to the CTAS statement. For more information, see CDAS introduction.
By using the CREATE TABLE AS (CTAS) statement, you can automatically create a table in StarRocks with the same schema as the table in MySQL and synchronize data. You can also synchronize schema changes of the source table to the destination table in real time, which improves the efficiency of creating tables in the destination storage and maintaining schema changes of the source table.
When you execute the CTAS statement, Flink performs the following operations:
Check whether the destination table exists in the destination storage.
If the destination table does not exist, the corresponding destination table is created in the destination storage based on the catalog to which the destination table belongs. The destination table has the same schema as the source table.
If the destination tables exist, Flink skips the table creation step. If the schema of the existing destination table is different from the schema of the source table, an error message is returned.
Commit and run a data synchronization job. Flink synchronizes data and changes in the schema of the source table to the destination table.
The schema change synchronization policy uses the CTAS statement to synchronize data in real time and changes in the schema of the source table to the destination table.
Schema changes include table creation and schema changes after a table is created.
The following schema changes are supported:
Add a nullable column: The statement automatically adds the related column to the end of the schema of the destination table and synchronizes data to the added column.
Delete a nullable column: The statement automatically fills the nullable column of the destination table with null values instead of deleting the column from the table.
Rename a column: The statement adds the renamed column to the end of the destination table and fills the column before renaming with null values.
For example, if the name of the col_a column in the source table is changed to col_b, the col_b column is added to the end of the destination table and the col_a column is automatically filled with null values.
The following schema changes are not supported:
Change of data types.
For example, the data in a column is changed from the VARCHAR type to the BIGINT type, or the column property is changed from NOT NULL to NULLABLE.
Change of constraints, such as the primary key or index.
Addition or deletion of a non-nullable column.
- If the schema of the source table has one of the preceding changes, you must delete the destination table and restart the job that executes the CTAS statement. This way, the destination table is created again and the historical data is resynchronized to the destination table.
- The CTAS statement does not identify the types of DDL statements, but compares the schema differences between the two data records before and after the schema is changed. Therefore, if you delete a column and then add the column again, and no data changes between the two DDL statements that are used to delete and add the column, the CTAS statement considers that no schema change occurs. Similarly, the CTAS statement does not trigger schema change synchronization even if you add a column to the source table. The statement identifies the schema change only when the data changes in the source table. In this case, the statement synchronizes the schema change to the destination table.
- For more information about the field types supported by the CTAS statement, see Continuously load data from Apache Flink®.
Prerequisites
A Dataflow cluster is created in the new console, and the Flink service is selected. For more information, see Create a cluster.
An EMR Serverless StarRocks instance is created. For more information, see Create an instance.
An ApsaraDB RDS for MySQL instance is created. For more information, see Step 1: Create an ApsaraDB RDS for MySQL instance and configure a database.
This topic uses MySQL 5.7 and a Dataflow cluster of EMR-3.42.0 as an example.
Limits
The Dataflow cluster, StarRocks instance, and ApsaraDB RDS for MySQL instance must be deployed in the same virtual private cloud (VPC).
The Dataflow cluster and StarRocks instance must be accessible over the Internet.
The engine version of the ApsaraDB RDS for MySQL instance must be 5.7 or later.
The Dataflow cluster must be of EMR-3.42.0 or later or EMR-5.8.0 or later.
Step 1: Prepare test data
Create a test database and a test account. For more information, see Step 1: Create an ApsaraDB RDS for MySQL instance and configure a database.
After you create the test database and account, grant the read and write permissions to the account.
NoteIn this topic, the database name is test_cdc and the account name is emr_test.
Connect to the ApsaraDB RDS for MySQL instance by using the test account. For more information, see Step 2: Connect to the ApsaraDB RDS for MySQL instance.
Run the following command to create a table:
use test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)Log on to and connect to the EMR Serverless StarRocks instance. For more information, see Connect to a StarRocks instance by using a MySQL client.
Run the following command to create the test_cdc database, create a super administrator user named test (with the example password 1qaz!QAZ), or create a regular user named test and grant permissions on the database to the user. For more information, see Manage users.
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED by '1qaz!QAZ'; GRANT ALL on test_cdc to test;
Step 2: Upload custom connectors
Upload custom connectors for Flink, StarRocks, and ApsaraDB RDS for MySQL connections.
Log on to the Dataflow cluster by using SSH. For more information, see Log on to a cluster.
Download flink-connector-starrocks-1.2.2_flink-1.13_2.11.jar and ververica-connector-mysql-1.13-vvr-4.0.12-1-20220330.065158-3-jar-with-dependencies.jar, and upload them to the /opt/apps/FLINK/flink-current/lib directory of the Dataflow cluster.
Step 3: Execute the CTAS statement
Submit a job in session mode.
Log on to the Dataflow cluster by using SSH. For more information, see Log on to a cluster.
Run the following command to go to the /opt/apps/FLINK/flink-current directory:
cd /opt/apps/FLINK/flink-currentRun the following command to start a YARN session:
./bin/yarn-session.sh --detachedIf the command is successfully executed,
application_XXXX_YYis returned in the output. This is the sessionId that you need to use to log on to the SQL client.
Run the following command to open the SQL client:
./bin/sql-client.sh -s <application_XXXX_YY>NoteReplace
<application_XXXX_YY>with the sessionId that you obtained in the previous step.
Create catalogs for MySQL and StarRocks.
CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'username' = 'test', 'password' = '1qaz!QAZ', 'dbname' = 'test_cdc' ); CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '123456', 'default-database' = 'test_cdc' );The following table describes the parameters. You can modify the parameters based on your business requirements.
Table 1. StarRocks catalog parameters
Parameter
Description
type
The type of the catalog. Set the value to starrocks.
endpoint
The internal endpoint and query port of the FE node. The format is
Internal endpoint of the FE node of the EMR Serverless StarRocks instance:9030. Example: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030.NoteFor information about how to obtain the internal endpoint of the FE node of an EMR Serverless StarRocks instance, see View the instance list and details.
username
The username that is used to access the StarRocks database.
Enter the username that you created in Step 1: Prepare test data. In this example, test is used.
password
The password that is used to access the StarRocks database.
Enter the password that you set for the account in Step 1: Prepare test data. In this example, 1qaz!QAZ is used.
dbname
The name of the StarRocks database.
Enter the database name that you created in Step 1: Prepare test data. In this example, test_cdc is used.
Table 2. MySQL catalog parameters
Parameter
Description
type
The type of the catalog. Set the value to mysql.
hostname
The internal endpoint of the ApsaraDB RDS for MySQL instance.
You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console. Example: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.
port
The port number of the MySQL database. Default value: 3306.
username
The username that is used to access the MySQL database.
Enter the username of the account that you created in Step 1: Prepare test data. In this example, emr_test is used.
password
The password that is used to access the MySQL database.
Enter the password of the account that you created in Step 1: Prepare test data. In this example, 123456 is used.
default-database
The name of the default MySQL database.
Enter the database name that you created in Step 1: Prepare test data. In this example, test_cdc is used.
Execute the CTAS statement in the StarRocks catalog.
You can use one of the following three methods to execute the CTAS statement:
At-least-once semantics: You can use the sink.buffer-flush.interval-ms parameter to configure the interval at which data is written to StarRocks. The advantage is that the write interval is short and less memory is used.
use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;Exactly-once semantics: You must specify the interval at which checkpoints are periodically scheduled. The advantage is that data is not lost or duplicated when errors occur. The disadvantage is that the checkpoint interval determines when the data is visible. For more information, see Checkpointing.
set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;Simple mode: The advantage is that you do not need to take note of the fields in the table in the MySQL database when you create a table. The schema of the table that you want to create is the same as the schema of the table in the MySQL database. This mode is easy to use for developers. The disadvantage is that you cannot create partitions. For tables that need to be partitioned, you must create partitions in normal mode.
use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'='buckets 8', 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;Table 3. WITH parameters
Parameter
Required
Description
starrocks.create.table.properties
Yes
Other suffix definitions except for field definitions in the statement that is used to create a table in the StarRocks database, such as engine, key, and buckets in the sample code.
database-name
Yes
The name of the StarRocks database.
In this example, test_cdc is used.
jdbc-url
Yes
The JDBC URL that is used to connect to StarRocks and perform queries in StarRocks.
Example: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030.
fe-c-9b354c83e891****-internal.starrocks.aliyuncs.comis the internal endpoint of the FE node of the EMR Serverless StarRocks instance.NoteFor information about how to obtain the internal endpoint of the FE node of an EMR Serverless StarRocks instance, see View the instance list and details.
load-url
Yes
The internal endpoint and query port of the FE node. The format is
Internal endpoint of the FE node of the EMR Serverless StarRocks instance:8030.Example: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.
NoteFor information about how to obtain the internal endpoint of the FE node of an EMR Serverless StarRocks instance, see View the instance list and details.
sink.semantic
No
The semantics that is used to execute the statement. Set this parameter to exactly-once to ensure data consistency. Default value: at-least-once.
starrocks.create.table.mode
No
Valid values:
normal (default): You must specify complete configurations such as engine, key, and buckets in the starrocks.create.table.properties parameter, as shown in the example.
simple: By default, the engine parameter is set to olap, the key parameter is set to primary key. The primary key is the same as the primary key in the MySQL table. The distributed by hash parameter is configured for all primary keys, and no partition exists. You must specify buckets in the starrocks.create.table.properties parameter. You can also specify optional parameters such as properties.
sink.properties.row_delimiter
No
The custom row delimiter.
sink.properties.column_separator
No
The custom column delimiter.
NoteIf you use a version earlier than Flink of vvr-6.0.5-flink-1.15, you must add
'sink.use.new-apiapi'='false',to the WITH clause.For more information about other configurations, see Continuously load data from Apache Flink.
Table 4. OPTIONS parameters
Parameter
Description
connector
The type of the connector. Set the value to mysql-cdc.
hostname
The internal endpoint of the ApsaraDB RDS for MySQL instance.
You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console. Example: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.
port
The port number of the MySQL database. Default value: 3306.
username
The username that is used to access the MySQL database.
Enter the username of the account that you created in Step 1: Prepare test data. In this example, emr_test is used.
password
The password that is used to access the MySQL database.
Enter the password of the account that you created in Step 1: Prepare test data.
table-name
The name of the table in the StarRocks database.
Enter the table name that you created in Step 1: Prepare test data. In this example, runoob_tbl is used.
database-name
The name of the default MySQL database.
Enter the database name that you created in Step 1: Prepare test data. In this example, test_cdc is used.
Step 4: View the data synchronization result
If checkpointing is enabled, the maximum waiting time is approximately the checkpoint interval.
Query data
Log on to and connect to the EMR Serverless StarRocks instance. For more information, see Connect to a StarRocks instance by using a MySQL client.
Run the following command in the StarRocks connection window to view the table data:
use test_cdc; select * from runoob_tbl1;The following output is returned. This indicates that data is synchronized from the ApsaraDB RDS for MySQL instance to the StarRocks cluster.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Query inserted data
Run the following command in the ApsaraDB RDS for MySQL database window to insert data:
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1);Run the following command in the StarRocks connection window to view the table data:
select * from runoob_tbl1;The following output is returned. This indicates that the data is inserted.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Synchronize updated data
Run the following command in the ApsaraDB RDS for MySQL database window to update the specified data:
update runoob_tbl set runoob_title= 'new' where runoob_id = 18;Run the following command in the StarRocks connection window to view the table data:
select * from runoob_tbl1;The following output is returned. This indicates that the updated data is synchronized.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Synchronize deleted data
Run the following command in the ApsaraDB RDS for MySQL database window to delete the specified data:
DELETE FROM runoob_tbl WHERE runoob_id = 1;Run the following command in the StarRocks connection window to view the table data:
select * from runoob_tbl1;The following output is returned. This indicates that the deleted data is synchronized.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Add nullable columns
Run the following command in the ApsaraDB RDS for MySQL database window to add a nullable column:
alter table `runoob_tbl` add COLUMN `add_col2` INT;Run the following command to insert data:
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)Run the following command in the StarRocks connection window to view the table data:
select * from runoob_tbl1;The following output is returned. This indicates that the schema is changed and the nullable column is added.
+-----------+--------------+---------------+-----------------+---------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_co2 | +-----------+--------------+---------------+-----------------+---------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | 2 | | 18 | new | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+---------+
CDAS introduction
The CREATE DATABASE AS (CDAS) statement is a syntactic sugar of the CTAS statement. By using the CDAS statement, you can synchronize an entire database from MySQL, which means that a Flink job is generated. The source table is a database in MySQL, and the destination tables are multiple tables in StarRocks. You can also use the including table syntax to select only some tables in a database for the CDAS operation.
Similar to the execution of the CTAS statement, you must create catalogs in a MySQL database and a StarRocks database before you execute the CDAS statement. Sample statement:
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'username'='test',
'password' = '1qaz!QAZ',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
as DATABASEmysql.test_cdc including table
'tabl1','tbl2','tbl3' /*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;