All Products
Search
Document Center

MaxCompute:Open-source Flink CDC for near-real-time writes to Delta Table

Last Updated:Mar 26, 2026

MaxCompute provides a Flink Change Data Capture (CDC) connector that synchronizes change data from sources such as MySQL to MaxCompute standard tables or Delta tables in real time. This topic explains how to use the Flink CDC pipeline connector to set up a streaming extract, transform, and load (ETL) pipeline from MySQL to MaxCompute.

The connector supports:

  • Automatic table creation based on source table structure

  • Schema evolution: changes to the source table schema are propagated to MaxCompute

  • Full database synchronization across multiple tables

  • Table routing: map source tables to different target table names, including sharded database merge

Prerequisites

Before you begin, make sure you have:

  • A MaxCompute project. To create one, see Create a MaxCompute project.

  • Docker and Docker Compose installed on your machine

  • Java Runtime Environment (JRE) compatible with Flink 1.18.0

Usage notes

  • Table type selection: If the source table has a primary key, a Delta table is created automatically. If no primary key exists, a MaxCompute standard table is created.

  • Write behavior for standard tables: DELETE operations are ignored. UPDATE operations are treated as INSERT operations.

  • Delivery semantics: Only at-least-once semantics is supported. To implement idempotent writes, use a Delta table — the primary key in the Delta table handles deduplication.

  • Schema evolution: Schema changes from the source table are propagated to MaxCompute. New columns are always appended as the last column. Column data types can only be changed to a compatible type. For details, see Change the data type of a column.

  • Sharded database merge: Merging multiple source tables into one sink table using regular expressions is supported. Scenarios where duplicate primary key values exist across multiple source tables are not supported and will be supported in later connector versions.

Get started

This tutorial walks through a complete streaming ETL job that synchronizes change data from MySQL to MaxCompute using a Flink CDC pipeline. It covers full database synchronization, schema evolution, and table routing.

Set up the environments

Set up a Flink cluster in standalone mode

  1. Download flink-1.18.0-bin-scala_2.12.tgz and decompress it to get the flink-1.18.0 directory. In the flink-1.18.0 directory, run the following command to set FLINK_HOME:

    export FLINK_HOME=$(pwd)
  2. Open $FLINK_HOME/conf/flink-conf.yaml with a text editor and add the following parameters:

    # Enable checkpointing. Run a checkpoint every 3 seconds.
    # For production, set the checkpoint interval to at least 30 seconds.
    execution.checkpointing.interval: 3000
    
    # The Flink CDC pipeline connector relies on Flink's communication mechanism
    # for data synchronization. Increase the timeout to prevent connection drops.
    pekko.ask.timeout: 60s
  3. Start the Flink cluster:

    ./bin/start-cluster.sh

    After startup, open http://localhost:8081/ in a browser to access the Flink web UI. Run start-cluster.sh multiple times to add TaskManagers.

Set up a MySQL environment

This tutorial uses Docker Compose to run MySQL.

  1. Create a file named docker-compose.yaml with the following content:

    Parameter Description
    version Docker Compose version
    image Docker image version. Set to debezium/example-mysql:1.1
    ports MySQL port mapping
    environment MySQL root password and user credentials
    version: '2.1'
    services:
      mysql:
        image: debezium/example-mysql:1.1
        ports:
          - "3306:3306"
        environment:
          - MYSQL_ROOT_PASSWORD=123456
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw
  2. In the directory containing docker-compose.yaml, start the containers:

    docker-compose up -d

    Run docker ps to confirm all containers are running.

Prepare data in MySQL

  1. Connect to the MySQL container:

    docker-compose exec mysql mysql -uroot -p123456
  2. Create the app_db database and populate three tables:

    1. Create the database:

      CREATE DATABASE app_db;
      USE app_db;
    2. Create and populate the orders table:

      CREATE TABLE `orders` (
        `id` INT NOT NULL,
        `price` DECIMAL(10,2) NOT NULL,
        PRIMARY KEY (`id`)
      );
      
      INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
      INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
    3. Create and populate the shipments table:

      CREATE TABLE `shipments` (
        `id` INT NOT NULL,
        `city` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
      );
      
      INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
      INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
    4. Create and populate the products table:

      CREATE TABLE `products` (
        `id` INT NOT NULL,
        `product` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
      );
      
      INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
      INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
      INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

Submit a YAML pipeline job

  1. Download the required JAR packages:

    • Flink CDC package: Download flink-cdc-3.1.1-bin.tar.gz and decompress it to get the flink-cdc-3.1.1 directory. Move the contents of its bin, lib, log, and conf directories into the corresponding directories under flink-1.18.0.

    • Connector packages: Download the following JARs and place them in the flink-1.18.0/lib directory: > Note: The download links above point to released versions only. To use a SNAPSHOT version, compile the source code from the master or release branch locally.

    • JDBC driver: Download mysql-connector-java-8.0.27.jar. Pass it to the Flink CDC CLI with --jar, or place it in $FLINK_HOME/lib and restart the cluster. CDC connectors no longer bundle JDBC drivers.

  2. Create the pipeline YAML file. The following example (mysql-to-maxcompute.yaml) synchronizes all tables from the app_db database:

    Placeholder Description
    ${your_accessId} Your AccessKey ID. Get it from the AccessKey Pair page.
    ${your_accessKey} The AccessKey secret that corresponds to your AccessKey ID.
    ${your_maxcompute_endpoint} The MaxCompute endpoint for your region and network. See Endpoints.
    ${your_project} Your MaxCompute project name. Find it in the MaxCompute console under Workspace > Projects.
    ################################################################################
    # Description: Sync MySQL all tables to MaxCompute
    ################################################################################
    source:
      type: mysql
      hostname: localhost
      port: 3306
      username: root
      password: 123456
      tables: app_db\.*
      server-id: 5400-5404
      server-time-zone: UTC
    
    # Configure the accessId, accessKey, endpoint, and project parameters.
    sink:
      type: maxcompute
      name: MaxComputeSink
      accessId: ${your_accessId}
      accessKey: ${your_accessKey}
      endpoint: ${your_maxcompute_endpoint}
      project: ${your_project}
      bucketsNum: 8
    
    pipeline:
      name: Sync MySQL Database to MaxCompute
      parallelism: 1

    Replace the following placeholders: For all available source parameters, see MySQL connector. For all available sink parameters, see Connector configuration.

  3. Submit the pipeline job to the standalone Flink cluster:

    ./bin/flink-cdc.sh mysql-to-maxcompute.yaml

    A successful submission returns output similar to:

    Pipeline has been submitted to cluster.
    Job ID: f9f9689866946e25bf151ecc179ef46f
    Job Description: Sync MySQL Database to MaxCompute

    The Flink web UI shows a running job named Sync MySQL Database to MaxCompute.

  4. In MaxCompute, verify that the tables are created and data is written:

    -- Query the orders table.
    read orders;
    
    -- Expected result:
    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
    
    -- Query the shipments table.
    read shipments;
    
    -- Expected result:
    +------------+------------+
    | id         | city       |
    +------------+------------+
    | 1          | beijing    |
    | 2          | xian       |
    +------------+------------+
    
    -- Query the products table.
    read products;
    
    -- Expected result:
    +------------+------------+
    | id         | product    |
    +------------+------------+
    | 3          | Peanut     |
    | 1          | Beer       |
    | 2          | Cap        |
    +------------+------------+

Synchronize changes in real time

With the pipeline running, changes made in MySQL are reflected in MaxCompute in real time. The following steps use the orders table to demonstrate insert, schema change, update, and delete operations.

  1. Connect to the MySQL container:

    docker-compose exec mysql mysql -uroot -p123456
  2. Insert a row into the orders table:

    INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

    Run read orders; in MaxCompute. The new row appears:

    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 3          | 100        |
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
  3. Add a column to the orders table in MySQL:

    ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

    Run read orders; in MaxCompute. The new amount column is propagated automatically:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 4          | NULL       |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  4. Update a row in the orders table:

    UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

    Run read orders; in MaxCompute:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  5. Delete a row from the orders table:

    DELETE FROM app_db.orders WHERE id=2;

    Run read orders; in MaxCompute:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    +------------+------------+------------+

Route tables

Flink CDC supports routing source table schemas and data to different sink table names. Use this feature for database migrations, table renames, or sharded database merges.

The following YAML example routes each source table to a target table with a different name:

################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app_db\.*
  server-id: 5400-5404
  server-time-zone: UTC

# Configure the accessId, accessKey, endpoint, and project parameters.
sink:
  type: maxcompute
  name: MaxComputeSink
  accessId: ${your_accessId}
  accessKey: ${your_accessKey}
  endpoint: ${your_maxcompute_endpoint}
  project: ${your_project}
  bucketsNum: 8

route:
  - source-table: app_db.orders
    sink-table: ods_db.ods_orders
  - source-table: app_db.shipments
    sink-table: ods_db.ods_shipments
  - source-table: app_db.products
    sink-table: ods_db.ods_products

pipeline:
  name: Sync MySQL Database to MaxCompute
  parallelism: 1

The source-table field supports regular expressions. To merge multiple sharded tables into a single target table:

route:
  - source-table: app_db.order\.*
    sink-table: ods_db.ods_orders

This configuration aggregates data from app_db.order01, app_db.order02, app_db.order03, and similar tables into ods_db.ods_orders.

Scenarios in which duplicate primary key values exist across multiple source tables are not supported and will be supported in later connector versions.

For all route parameters, see Route.

Clean up

After finishing the tutorial, stop all running components.

  1. Stop the Docker containers. Run the following command in the directory containing docker-compose.yaml:

    docker-compose down
  2. Stop the Flink cluster. Run the following command in the flink-1.18.0 directory:

    ./bin/stop-cluster.sh

Appendix

Connector configuration

The following table lists all configuration parameters for the MaxCompute sink connector.

Data is flushed to MaxCompute when the buffer for a partition or bucket exceeds the configured size threshold, or when a checkpoint is triggered.

Parameter Required Default Type Description
type Yes String The connector type. Set to maxcompute.
name No String The display name of the sink.
accessId Yes String Your AccessKey ID. Get it from the AccessKey Pair page.
accessKey Yes String The AccessKey secret that corresponds to your AccessKey ID.
endpoint Yes String The MaxCompute endpoint. Configure based on your region and network connection method. See Endpoints.
project Yes String The MaxCompute project name. Find it in the MaxCompute console under Workspace > Projects.
tunnelEndpoint No String The MaxCompute Tunnel service endpoint. Supports automatic routing in most cases. Configure only when using a special network environment, such as a proxy.
quotaName No String The name of the exclusive resource group for MaxCompute Tunnel. If not set, a shared resource group is used.
stsToken No String Required when using a Security Token Service (STS) token issued by a RAM role for authentication.
bucketsNum No 16 Integer The number of buckets for Delta table auto-creation. See Near-real-time data warehouse overview.
compressAlgorithm No zlib String The compression algorithm for writes. Valid values: raw (no compression), zlib, snappy.
totalBatchSize No 64MB String The maximum buffer size per partition or non-partitioned table. When the buffer exceeds this value, data is flushed to MaxCompute. Each partition or non-partitioned table maintains an independent buffer.
bucketBatchSize No 4MB String The maximum buffer size per bucket. Applies to Delta table writes only. When the buffer exceeds this value, data is flushed to MaxCompute. Each bucket maintains an independent buffer.
numCommitThreads No 16 Integer The maximum number of partitions or tables processed concurrently during a checkpoint.
numFlushConcurrent No 4 Integer The maximum number of buckets written to MaxCompute concurrently. Applies to Delta table writes only.
retryTimes No 3 Integer The maximum number of retries on network error.
sleepMillis No Long The wait duration between retries on network error. Unit: milliseconds.

Table location mappings

When the connector creates a table automatically, it maps location information from the source to MaxCompute as follows.

Important

If the MaxCompute project does not support schema models, each synchronization task can sync only one MySQL database. The connector ignores TableId.namespace information for other data sources.

Object in Flink CDC Location in MaxCompute Location in MySQL
Project (from config file) Project
TableId.namespace Schema (requires schema model support; ignored if the project does not support schema models) Database
TableId.tableName Table Table

Data type mappings

The following table shows how Flink data types map to MaxCompute data types.

Flink data type MaxCompute data type Note
CHAR/VARCHAR STRING MaxCompute stores all string types as STRING.
BOOLEAN BOOLEAN
BINARY/VARBINARY BINARY
DECIMAL DECIMAL
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
TIME_WITHOUT_TIME_ZONE STRING MaxCompute does not have a native TIME type. Time values are stored as strings.
DATE DATE
TIMESTAMP_WITHOUT_TIME_ZONE TIMESTAMP_NTZ Stored without time zone information.
TIMESTAMP_WITH_LOCAL_TIME_ZONE TIMESTAMP
TIMESTAMP_WITH_TIME_ZONE TIMESTAMP
ARRAY ARRAY
MAP MAP
ROW STRUCT