MaxCompute provides a new version of the Flink Change Data Capture (CDC) connector. You can use the Flink CDC connector to synchronize data from a data source, such as MySQL, to MaxCompute standard tables or Delta tables in real time. This topic describes how to use the new version of the Flink CDC connector to synchronize data to MaxCompute.
Background information of Flink CDC
Flink CDC is an end-to-end open source tool for real-time data integration. It defines a set of fully functional APIs and provides an extract, transform, and load (ETL) task framework to process data. You can run Apache Flink jobs to use the features provided by Flink CDC. For more information, see Welcome to Flink CDC. Deeply integrated with and powered by Apache Flink, Flink CDC provides the following core features:
Provides an end-to-end data integration framework.
Provides APIs for data integration users to build jobs in an efficient manner.
Processes multiple tables in a source or sink.
Supports synchronization of all data from a database.
Supports schema evolution.
Prerequisites
A MaxCompute project is created. For more information, see Create a MaxCompute project.
Precautions
The Flink CDC connector can automatically create tables. When you use the connector to synchronize data, the locations and data types between a MaxCompute table and a source table are automatically mapped. If the source table has a primary key, a Delta table is automatically created. If the source table does not have a primary key, a MaxCompute standard table is created. For more information about the mappings of locations and data types between the source table and the MaxCompute table, see Table location mappings and Data type mappings.
If data is written to a MaxCompute standard table, the system ignores
DELETEoperations.UPDATEoperations are consideredINSERToperations.Only the at-least-once semantics is supported. Idempotent writes can be implemented in a Delta table based on the primary key in the Delta table.
Schema changes of the source table can be synchronized to the MaxCompute table.
A new column can only be appended to the MaxCompute table as the last column.
You can change the data type of a column only to a data type that is compatible with the original data type. For more information about the conversion between data types, see Change the data type of a column.
Getting started
This topic describes how to develop a streaming ETL job to synchronize change data from MySQL to MaxCompute by using a Flink CDC pipeline. In the Flink CDC pipeline, you can synchronize all data in a database, table schema changes, and data from tables in sharded databases.
Prepare environments
Prepare a Flink cluster that is deployed in standalone mode
Download the flink-1.18.0-bin-scala_2.12.tgz package and decompress the package to obtain the
flink-1.18.0directory. Go to theflink-1.18.0directory and run the following command to set the installation directory of flink-1.18.0 to FLINK_HOME:export FLINK_HOME=$(pwd)Run the
vim flink-conf.yamlcommand in the$flink-1.18.0/confdirectory, add the following parameters to the configuration file, and then save the file.# Enable the checkpointing feature. Run a checkpoint every 3 seconds. # This configuration is provided for testing purposes only. We recommend that you set the checkpoint interval to no less than 30 seconds for a job. execution.checkpointing.interval: 3000 # flink-cdc-pipeline-connector-maxcompute relies on the communication mechanism of Flink for data synchronization. # Increase the timeout period for Flink communication. pekko.ask.timeout: 60sRun the following command to start the Flink cluster:
./bin/start-cluster.shIf the startup is successful, you can enter http://localhost:8081/ in the address bar of a web browser to access the Flink web UI. 8081 is the default port number.
You can run the start-cluster.sh command multiple times to run multiple TaskManagers in parallel.
Prepare a MySQL environment
In this example, Docker Compose is used to prepare a MySQL environment.
Start a Docker image and create a file named
docker-compose.yaml. The following code shows the file content:version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpwThe following table describes the parameters.
Parameter
Description
version
The version of Docker.
image
The version of the Docker image. Set the value to debezium/example-mysql:1.1.
ports
The port number of the MySQL instance.
environment
The username and password of the MySQL instance.
In this example, the MySQL database app_db that contains product information is used in Docker Compose.
Run the following command in the directory in which the docker-compose.yaml file is stored to start the required components:
docker-compose up -dThis command automatically starts all the containers defined in the Docker Compose configuration in detached mode. You can run the
docker pscommand to check whether the containers are started.
Prepare data in the MySQL database
Run the following command to access the MySQL container:
docker-compose exec mysql mysql -uroot -p123456Create a database in MySQL and prepare table data.
Create a database.
CREATE DATABASE app_db; USE app_db;Prepare table data.
Create a table named orders and insert data into the table.
CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- Insert data into the table. INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);Create a table named shipments and insert data into the table.
CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- Insert data into the table. INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');Create a table named products and insert data into the table.
-- CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- Insert data into the table. INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
Submit the YAML file by using the Flink CDC CLI
Download the required JAR packages:
flink-cdc package
Go to the flink-cdc page to download the binary compressed package flink-cdc-3.1.1-bin.tar.gz and decompress the package to obtain the
flink-cdc-3.1.1directory. The flink-cdc-3.1.1 directory contains the bin, lib, log, and conf directories. Then, move the files in these directories to the related directories for flink-1.18.0.Connector packages
Download the following connector packages and move the packages to the
flink-1.18.0/libdirectory.NoteYou can click the download links to download only the released connector versions. If you want to reference a SNAPSHOT version, you must compile the source code of the version based on master or release branches on your on-premises machine.
Driver package
Download the MySQL Connector Java package and pass the package to the Flink CDC CLI by using the --jar parameter, or place the package in the
$flink-1.18.0/libdirectory and restart the Flink cluster. This is because CDC connectors no longer contain these drivers.
Write a YAML file for task configuration. The following sample code provides an example of the file
mysql-to-maxcompute.yamlfor database synchronization:################################################################################ # Description: Sync MySQL all tables to MaxCompute ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* server-id: 5400-5404 server-time-zone: UTC # Configure the accessId, accessKey, endpoint, and project parameters. sink: type: maxcompute name: MaxComputeSink accessId: ${your_accessId} accessKey: ${your_accessKey} endpoint: ${your_maxcompute_endpoint} project: ${your_project} bucketsNum: 8 pipeline: name: Sync MySQL Database to MaxCompute parallelism: 1Parameters:
For more information about the parameters in the source section, see MySQL Connector.
For more information about the parameters in the sink section, see Configuration items of the connector.
Run the following command to submit the task to the Flink cluster that is deployed in standalone mode:
./bin/flink-cdc.sh mysql-to-maxcompute.yamlAfter the task is submitted, the following result is returned:
Pipeline has been submitted to cluster. Job ID: f9f9689866946e25bf151ecc179ef46f Job Description: Sync MySQL Database to MaxComputeIn the Flink web UI, a task named
Sync MySQL Database to MaxComputeis running.Execute the following SQL statements in MaxCompute to check whether the orders, shipments, and products tables are created and data can be written to the tables.
-- Query the orders table. read orders; -- The following result is returned: +------------+------------+ | id | price | +------------+------------+ | 1 | 4 | | 2 | 100 | +------------+------------+ -- Query the shipments table. read shipments; -- The following result is returned: +------------+------------+ | id | city | +------------+------------+ | 1 | beijing | | 2 | xian | +------------+------------+ -- Query the products table. read products; -- The following result is returned: +------------+------------+ | id | product | +------------+------------+ | 3 | Peanut | | 1 | Beer | | 2 | Cap | +------------+------------+
Synchronize changes in real time
In this example, the orders table is used. When data in the source table in the MySQL database is modified, the data in the destination MaxCompute table is also modified in real time.
Run the following command to access the MySQL container:
docker-compose exec mysql mysql -uroot -p123456Insert a data record into the orders table of MySQL.
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);Run the
read orders;command in MaxCompute to query data in the orders table. The following result is returned:+------------+------------+ | id | price | +------------+------------+ | 3 | 100 | | 1 | 4 | | 2 | 100 | +------------+------------+Add a field to the orders table of MySQL.
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;Run the
read orders;command in MaxCompute to query data in the orders table. The following result is returned:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 4 | NULL | | 2 | 100 | NULL | +------------+------------+------------+Update a data record in the orders table of MySQL.
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;Run the
read orders;command in MaxCompute to query data in the orders table. The following result is returned:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | | 2 | 100 | NULL | +------------+------------+------------+Delete a data record from the orders table of MySQL.
DELETE FROM app_db.orders WHERE id=2;Run the
read orders;command in MaxCompute to query data in the orders table. The following result is returned:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | +------------+------------+------------+
Each time you perform an operation in MySQL, a data preview is performed in MaxCompute. The data displayed in the orders table in MaxCompute is updated in real time.
Route the changes
Flink CDC provides the feature to route the schema or data of a source table to other table names. You can use this feature to perform operations such as table or database name replacement, and database synchronization. The following sample code provides an example.
################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
# Configure the accessId, accessKey, endpoint, and project parameters.
sink:
type: maxcompute
name: MaxComputeSink
accessId: ${your_accessId}
accessKey: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
bucketsNum: 8
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to MaxCompute
parallelism: 1For more information about the parameters in the route section, see Route.
The preceding configuration in the route section is used to synchronize the schema and data of the app_db.orders table to the ods_db.ods_orders table. This way, database migration is implemented. You can use regular expressions to match multiple tables in source-table to synchronize data from multiple tables in a sharded database. Sample code:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_ordersIn this case, data of tables, such as app_db.order01, app_db.order02, and app_db.order03, can be aggregated into the ods_db.ods_orders table.
Scenarios in which duplicate primary key values exist in multiple tables are not supported and will be supported in later connector versions.
Clear the environments
After you perform the preceding operations, you must clear the environments.
Run the following command in the directory in which the docker-compose.yml file is stored to stop all containers:
docker-compose downRun the following command in the flink-1.18.0 directory in which Flink resides to stop the Flink cluster:
./bin/stop-cluster.sh
Appendixes
Configuration items of the connector
Parameter | Required | Default value | Data type | Description |
type | Yes | none | String | The connector that you want to use. Set the value to |
name | No | none | String | The name of the sink. |
accessId | Yes | none | String | The AccessKey ID of your Alibaba Cloud account or RAM user. You can obtain the AccessKey ID from the AccessKey Pair page. |
accessKey | Yes | none | String | The AccessKey secret that corresponds to the AccessKey ID. |
endpoint | Yes | none | String | The endpoint of MaxCompute. You must configure this parameter based on the region and network connection method that you selected when you create the MaxCompute project. For more information about the endpoints that are used in different regions and different network connection modes, see Endpoints. |
project | Yes | none | String | The name of the MaxCompute project. To obtain the name of the MaxCompute project, perform the following steps: Log on to the MaxCompute console. In the left-side navigation pane, choose Workspace > Projects to view the name of the MaxCompute project. |
tunnelEndpoint | No | none | String | The endpoint of the MaxCompute Tunnel service. In most cases, the endpoint supports automatic routing based on the region in which the project resides. The endpoint is used only in a special network environment, such as a network environment in which a proxy is used. |
quotaName | No | none | String | The name of the exclusive resource group for MaxCompute Tunnel. If this parameter is not configured, a shared resource group is used. |
stsToken | No | none | String | This parameter is required if you use a Security Token Service (STS) token issued by a RAM role for authentication. |
bucketsNum | No | 16 | Integer | The number of buckets that are used to automatically create a MaxCompute Delta table. For more information, see Near-real-time data warehouse overview. |
compressAlgorithm | No | zlib | String | The compression algorithm that is used when data is written to MaxCompute. Valid values: |
totalBatchSize | No | 64MB | String | The maximum amount of data in the buffer of a partition or a non-partitioned table. The buffers of different partitions or non-partitioned tables are independent of each other. When data in the buffer exceeds the value of this parameter, data is written to MaxCompute. |
bucketBatchSize | No | 4MB | String | The maximum amount of data in the buffer of a bucket. This parameter takes effect only if you write data to Delta tables. The buffers of different buckets are independent of each other. When data in the buffer exceeds the value of this parameter, data is written to MaxCompute. |
numCommitThreads | No | 16 | Integer | The maximum number of partitions or tables that can be processed at the same time during the checkpointing operation. |
numFlushConcurrent | No | 4 | Integer | The maximum number of buckets that can be written to MaxCompute at the same time. This parameter takes effect only if you write data to Delta tables. |
retryTimes | No | 3 | Integer | The maximum number of retries when a network connection error occurs. |
sleepMillis | No | true | Long | The waiting duration for each retry when a network connection error occurs. Unit: milliseconds. |
Table location mappings
When the Flink CDC connector automatically creates a table, the location information of the source table is mapped to the MaxCompute table based on the mappings described in the following table.
If a MaxCompute project does not support schema models, each synchronization task can synchronize data of only one MySQL database. For other data sources, the Flink CDC connector ignores the tableId.namespace information.
Object in Flink CDC | Location in MaxCompute | Location in MySQL |
Project in the configuration file | Project | none |
TableId.namespace | Schema (This configuration is supported only when the MaxCompute project supports the schema model. If the MaxCompute project does not support the schema model, this configuration is ignored.) | Database |
TableId.tableName | Table | Table |
Data type mappings
Flink data type | MaxCompute data type |
CHAR/VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |