MaxCompute provides a Flink Change Data Capture (CDC) connector that synchronizes change data from sources such as MySQL to MaxCompute standard tables or Delta tables in real time. This topic explains how to use the Flink CDC pipeline connector to set up a streaming extract, transform, and load (ETL) pipeline from MySQL to MaxCompute.
The connector supports:
-
Automatic table creation based on source table structure
-
Schema evolution: changes to the source table schema are propagated to MaxCompute
-
Full database synchronization across multiple tables
-
Table routing: map source tables to different target table names, including sharded database merge
Prerequisites
Before you begin, make sure you have:
-
A MaxCompute project. To create one, see Create a MaxCompute project.
-
Docker and Docker Compose installed on your machine
-
Java Runtime Environment (JRE) compatible with Flink 1.18.0
Usage notes
-
Table type selection: If the source table has a primary key, a Delta table is created automatically. If no primary key exists, a MaxCompute standard table is created.
-
Write behavior for standard tables:
DELETEoperations are ignored.UPDATEoperations are treated asINSERToperations. -
Delivery semantics: Only at-least-once semantics is supported. To implement idempotent writes, use a Delta table — the primary key in the Delta table handles deduplication.
-
Schema evolution: Schema changes from the source table are propagated to MaxCompute. New columns are always appended as the last column. Column data types can only be changed to a compatible type. For details, see Change the data type of a column.
-
Sharded database merge: Merging multiple source tables into one sink table using regular expressions is supported. Scenarios where duplicate primary key values exist across multiple source tables are not supported and will be supported in later connector versions.
Get started
This tutorial walks through a complete streaming ETL job that synchronizes change data from MySQL to MaxCompute using a Flink CDC pipeline. It covers full database synchronization, schema evolution, and table routing.
Set up the environments
Set up a Flink cluster in standalone mode
-
Download flink-1.18.0-bin-scala_2.12.tgz and decompress it to get the
flink-1.18.0directory. In theflink-1.18.0directory, run the following command to setFLINK_HOME:export FLINK_HOME=$(pwd) -
Open
$FLINK_HOME/conf/flink-conf.yamlwith a text editor and add the following parameters:# Enable checkpointing. Run a checkpoint every 3 seconds. # For production, set the checkpoint interval to at least 30 seconds. execution.checkpointing.interval: 3000 # The Flink CDC pipeline connector relies on Flink's communication mechanism # for data synchronization. Increase the timeout to prevent connection drops. pekko.ask.timeout: 60s -
Start the Flink cluster:
./bin/start-cluster.shAfter startup, open
http://localhost:8081/in a browser to access the Flink web UI. Runstart-cluster.shmultiple times to add TaskManagers.
Set up a MySQL environment
This tutorial uses Docker Compose to run MySQL.
-
Create a file named
docker-compose.yamlwith the following content:Parameter Description versionDocker Compose version imageDocker image version. Set to debezium/example-mysql:1.1portsMySQL port mapping environmentMySQL root password and user credentials version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw -
In the directory containing
docker-compose.yaml, start the containers:docker-compose up -dRun
docker psto confirm all containers are running.
Prepare data in MySQL
-
Connect to the MySQL container:
docker-compose exec mysql mysql -uroot -p123456 -
Create the
app_dbdatabase and populate three tables:-
Create the database:
CREATE DATABASE app_db; USE app_db; -
Create and populate the
orderstable:CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00); -
Create and populate the
shipmentstable:CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian'); -
Create and populate the
productstable:CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
-
Submit a YAML pipeline job
-
Download the required JAR packages:
-
Flink CDC package: Download flink-cdc-3.1.1-bin.tar.gz and decompress it to get the
flink-cdc-3.1.1directory. Move the contents of itsbin,lib,log, andconfdirectories into the corresponding directories underflink-1.18.0. -
Connector packages: Download the following JARs and place them in the
flink-1.18.0/libdirectory: > Note: The download links above point to released versions only. To use a SNAPSHOT version, compile the source code from the master or release branch locally. -
JDBC driver: Download mysql-connector-java-8.0.27.jar. Pass it to the Flink CDC CLI with
--jar, or place it in$FLINK_HOME/liband restart the cluster. CDC connectors no longer bundle JDBC drivers.
-
-
Create the pipeline YAML file. The following example (
mysql-to-maxcompute.yaml) synchronizes all tables from theapp_dbdatabase:Placeholder Description ${your_accessId}Your AccessKey ID. Get it from the AccessKey Pair page. ${your_accessKey}The AccessKey secret that corresponds to your AccessKey ID. ${your_maxcompute_endpoint}The MaxCompute endpoint for your region and network. See Endpoints. ${your_project}Your MaxCompute project name. Find it in the MaxCompute console under Workspace > Projects. ################################################################################ # Description: Sync MySQL all tables to MaxCompute ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db\.* server-id: 5400-5404 server-time-zone: UTC # Configure the accessId, accessKey, endpoint, and project parameters. sink: type: maxcompute name: MaxComputeSink accessId: ${your_accessId} accessKey: ${your_accessKey} endpoint: ${your_maxcompute_endpoint} project: ${your_project} bucketsNum: 8 pipeline: name: Sync MySQL Database to MaxCompute parallelism: 1Replace the following placeholders: For all available source parameters, see MySQL connector. For all available sink parameters, see Connector configuration.
-
Submit the pipeline job to the standalone Flink cluster:
./bin/flink-cdc.sh mysql-to-maxcompute.yamlA successful submission returns output similar to:
Pipeline has been submitted to cluster. Job ID: f9f9689866946e25bf151ecc179ef46f Job Description: Sync MySQL Database to MaxComputeThe Flink web UI shows a running job named
Sync MySQL Database to MaxCompute. -
In MaxCompute, verify that the tables are created and data is written:
-- Query the orders table. read orders; -- Expected result: +------------+------------+ | id | price | +------------+------------+ | 1 | 4 | | 2 | 100 | +------------+------------+ -- Query the shipments table. read shipments; -- Expected result: +------------+------------+ | id | city | +------------+------------+ | 1 | beijing | | 2 | xian | +------------+------------+ -- Query the products table. read products; -- Expected result: +------------+------------+ | id | product | +------------+------------+ | 3 | Peanut | | 1 | Beer | | 2 | Cap | +------------+------------+
Synchronize changes in real time
With the pipeline running, changes made in MySQL are reflected in MaxCompute in real time. The following steps use the orders table to demonstrate insert, schema change, update, and delete operations.
-
Connect to the MySQL container:
docker-compose exec mysql mysql -uroot -p123456 -
Insert a row into the
orderstable:INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);Run
read orders;in MaxCompute. The new row appears:+------------+------------+ | id | price | +------------+------------+ | 3 | 100 | | 1 | 4 | | 2 | 100 | +------------+------------+ -
Add a column to the
orderstable in MySQL:ALTER TABLE app_db.orders ADD amount varchar(100) NULL;Run
read orders;in MaxCompute. The newamountcolumn is propagated automatically:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 4 | NULL | | 2 | 100 | NULL | +------------+------------+------------+ -
Update a row in the
orderstable:UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;Run
read orders;in MaxCompute:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | | 2 | 100 | NULL | +------------+------------+------------+ -
Delete a row from the
orderstable:DELETE FROM app_db.orders WHERE id=2;Run
read orders;in MaxCompute:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | +------------+------------+------------+
Route tables
Flink CDC supports routing source table schemas and data to different sink table names. Use this feature for database migrations, table renames, or sharded database merges.
The following YAML example routes each source table to a target table with a different name:
################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db\.*
server-id: 5400-5404
server-time-zone: UTC
# Configure the accessId, accessKey, endpoint, and project parameters.
sink:
type: maxcompute
name: MaxComputeSink
accessId: ${your_accessId}
accessKey: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
bucketsNum: 8
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to MaxCompute
parallelism: 1
The source-table field supports regular expressions. To merge multiple sharded tables into a single target table:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_orders
This configuration aggregates data from app_db.order01, app_db.order02, app_db.order03, and similar tables into ods_db.ods_orders.
Scenarios in which duplicate primary key values exist across multiple source tables are not supported and will be supported in later connector versions.
For all route parameters, see Route.
Clean up
After finishing the tutorial, stop all running components.
-
Stop the Docker containers. Run the following command in the directory containing
docker-compose.yaml:docker-compose down -
Stop the Flink cluster. Run the following command in the
flink-1.18.0directory:./bin/stop-cluster.sh
Appendix
Connector configuration
The following table lists all configuration parameters for the MaxCompute sink connector.
Data is flushed to MaxCompute when the buffer for a partition or bucket exceeds the configured size threshold, or when a checkpoint is triggered.
| Parameter | Required | Default | Type | Description |
|---|---|---|---|---|
type |
Yes | — | String | The connector type. Set to maxcompute. |
name |
No | — | String | The display name of the sink. |
accessId |
Yes | — | String | Your AccessKey ID. Get it from the AccessKey Pair page. |
accessKey |
Yes | — | String | The AccessKey secret that corresponds to your AccessKey ID. |
endpoint |
Yes | — | String | The MaxCompute endpoint. Configure based on your region and network connection method. See Endpoints. |
project |
Yes | — | String | The MaxCompute project name. Find it in the MaxCompute console under Workspace > Projects. |
tunnelEndpoint |
No | — | String | The MaxCompute Tunnel service endpoint. Supports automatic routing in most cases. Configure only when using a special network environment, such as a proxy. |
quotaName |
No | — | String | The name of the exclusive resource group for MaxCompute Tunnel. If not set, a shared resource group is used. |
stsToken |
No | — | String | Required when using a Security Token Service (STS) token issued by a RAM role for authentication. |
bucketsNum |
No | 16 | Integer | The number of buckets for Delta table auto-creation. See Near-real-time data warehouse overview. |
compressAlgorithm |
No | zlib |
String | The compression algorithm for writes. Valid values: raw (no compression), zlib, snappy. |
totalBatchSize |
No | 64MB |
String | The maximum buffer size per partition or non-partitioned table. When the buffer exceeds this value, data is flushed to MaxCompute. Each partition or non-partitioned table maintains an independent buffer. |
bucketBatchSize |
No | 4MB |
String | The maximum buffer size per bucket. Applies to Delta table writes only. When the buffer exceeds this value, data is flushed to MaxCompute. Each bucket maintains an independent buffer. |
numCommitThreads |
No | 16 | Integer | The maximum number of partitions or tables processed concurrently during a checkpoint. |
numFlushConcurrent |
No | 4 | Integer | The maximum number of buckets written to MaxCompute concurrently. Applies to Delta table writes only. |
retryTimes |
No | 3 | Integer | The maximum number of retries on network error. |
sleepMillis |
No | — | Long | The wait duration between retries on network error. Unit: milliseconds. |
Table location mappings
When the connector creates a table automatically, it maps location information from the source to MaxCompute as follows.
If the MaxCompute project does not support schema models, each synchronization task can sync only one MySQL database. The connector ignores TableId.namespace information for other data sources.
| Object in Flink CDC | Location in MaxCompute | Location in MySQL |
|---|---|---|
| Project (from config file) | Project | — |
TableId.namespace |
Schema (requires schema model support; ignored if the project does not support schema models) | Database |
TableId.tableName |
Table | Table |
Data type mappings
The following table shows how Flink data types map to MaxCompute data types.
| Flink data type | MaxCompute data type | Note |
|---|---|---|
| CHAR/VARCHAR | STRING | MaxCompute stores all string types as STRING. |
| BOOLEAN | BOOLEAN | — |
| BINARY/VARBINARY | BINARY | — |
| DECIMAL | DECIMAL | — |
| TINYINT | TINYINT | — |
| SMALLINT | SMALLINT | — |
| INTEGER | INTEGER | — |
| BIGINT | BIGINT | — |
| FLOAT | FLOAT | — |
| DOUBLE | DOUBLE | — |
| TIME_WITHOUT_TIME_ZONE | STRING | MaxCompute does not have a native TIME type. Time values are stored as strings. |
| DATE | DATE | — |
| TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ | Stored without time zone information. |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP | — |
| TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP | — |
| ARRAY | ARRAY | — |
| MAP | MAP | — |
| ROW | STRUCT | — |