All Products
Search
Document Center

MaxCompute:Open-source Flink CDC for near-real-time writes to Delta Table

Last Updated:Apr 15, 2025

MaxCompute provides a new version of the Flink Change Data Capture (CDC) connector. You can use the Flink CDC connector to synchronize data from a data source, such as MySQL, to MaxCompute standard tables or Delta tables in real time. This topic describes how to use the new version of the Flink CDC connector to synchronize data to MaxCompute.

Background information of Flink CDC

Flink CDC is an end-to-end open source tool for real-time data integration. It defines a set of fully functional APIs and provides an extract, transform, and load (ETL) task framework to process data. You can run Apache Flink jobs to use the features provided by Flink CDC. For more information, see Welcome to Flink CDC. Deeply integrated with and powered by Apache Flink, Flink CDC provides the following core features:

  • Provides an end-to-end data integration framework.

  • Provides APIs for data integration users to build jobs in an efficient manner.

  • Processes multiple tables in a source or sink.

  • Supports synchronization of all data from a database.

  • Supports schema evolution.

Prerequisites

A MaxCompute project is created. For more information, see Create a MaxCompute project.

Precautions

  • The Flink CDC connector can automatically create tables. When you use the connector to synchronize data, the locations and data types between a MaxCompute table and a source table are automatically mapped. If the source table has a primary key, a Delta table is automatically created. If the source table does not have a primary key, a MaxCompute standard table is created. For more information about the mappings of locations and data types between the source table and the MaxCompute table, see Table location mappings and Data type mappings.

  • If data is written to a MaxCompute standard table, the system ignores DELETE operations. UPDATE operations are considered INSERT operations.

  • Only the at-least-once semantics is supported. Idempotent writes can be implemented in a Delta table based on the primary key in the Delta table.

  • Schema changes of the source table can be synchronized to the MaxCompute table.

  • A new column can only be appended to the MaxCompute table as the last column.

  • You can change the data type of a column only to a data type that is compatible with the original data type. For more information about the conversion between data types, see Change the data type of a column.

Getting started

This topic describes how to develop a streaming ETL job to synchronize change data from MySQL to MaxCompute by using a Flink CDC pipeline. In the Flink CDC pipeline, you can synchronize all data in a database, table schema changes, and data from tables in sharded databases.

Prepare environments

Prepare a Flink cluster that is deployed in standalone mode

  1. Download the flink-1.18.0-bin-scala_2.12.tgz package and decompress the package to obtain the flink-1.18.0 directory. Go to the flink-1.18.0 directory and run the following command to set the installation directory of flink-1.18.0 to FLINK_HOME:

    export FLINK_HOME=$(pwd)
  2. Run the vim flink-conf.yaml command in the $flink-1.18.0/conf directory, add the following parameters to the configuration file, and then save the file.

    # Enable the checkpointing feature. Run a checkpoint every 3 seconds.
    # This configuration is provided for testing purposes only. We recommend that you set the checkpoint interval to no less than 30 seconds for a job.
    execution.checkpointing.interval: 3000
    
    # flink-cdc-pipeline-connector-maxcompute relies on the communication mechanism of Flink for data synchronization. 
    # Increase the timeout period for Flink communication.
    pekko.ask.timeout: 60s
  3. Run the following command to start the Flink cluster:

    ./bin/start-cluster.sh

    If the startup is successful, you can enter http://localhost:8081/ in the address bar of a web browser to access the Flink web UI. 8081 is the default port number.

    You can run the start-cluster.sh command multiple times to run multiple TaskManagers in parallel.

Prepare a MySQL environment

In this example, Docker Compose is used to prepare a MySQL environment.

  1. Start a Docker image and create a file named docker-compose.yaml. The following code shows the file content:

    version: '2.1'
    services:
      mysql:
        image: debezium/example-mysql:1.1
        ports:
          - "3306:3306"
        environment:
          - MYSQL_ROOT_PASSWORD=123456
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw

    The following table describes the parameters.

    Parameter

    Description

    version

    The version of Docker.

    image

    The version of the Docker image. Set the value to debezium/example-mysql:1.1.

    ports

    The port number of the MySQL instance.

    environment

    The username and password of the MySQL instance.

    In this example, the MySQL database app_db that contains product information is used in Docker Compose.

  2. Run the following command in the directory in which the docker-compose.yaml file is stored to start the required components:

    docker-compose up -d

    This command automatically starts all the containers defined in the Docker Compose configuration in detached mode. You can run the docker ps command to check whether the containers are started.

Prepare data in the MySQL database

  1. Run the following command to access the MySQL container:

    docker-compose exec mysql mysql -uroot -p123456
  2. Create a database in MySQL and prepare table data.

    1. Create a database.

      CREATE DATABASE app_db;
      USE app_db;
    2. Prepare table data.

      • Create a table named orders and insert data into the table.

        CREATE TABLE `orders` (
        `id` INT NOT NULL,
        `price` DECIMAL(10,2) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- Insert data into the table.
        INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
        INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
      • Create a table named shipments and insert data into the table.

        CREATE TABLE `shipments` (
        `id` INT NOT NULL,
        `city` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- Insert data into the table.
        INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
        INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
      • Create a table named products and insert data into the table.

        -- 
        CREATE TABLE `products` (
        `id` INT NOT NULL,
        `product` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- Insert data into the table.
        INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
        INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
        INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

Submit the YAML file by using the Flink CDC CLI

  1. Download the required JAR packages:

    • flink-cdc package

      Go to the flink-cdc page to download the binary compressed package flink-cdc-3.1.1-bin.tar.gz and decompress the package to obtain the flink-cdc-3.1.1 directory. The flink-cdc-3.1.1 directory contains the bin, lib, log, and conf directories. Then, move the files in these directories to the related directories for flink-1.18.0.

    • Connector packages

      Download the following connector packages and move the packages to the flink-1.18.0/lib directory.

      Note

      You can click the download links to download only the released connector versions. If you want to reference a SNAPSHOT version, you must compile the source code of the version based on master or release branches on your on-premises machine.

    • Driver package

      Download the MySQL Connector Java package and pass the package to the Flink CDC CLI by using the --jar parameter, or place the package in the $flink-1.18.0/lib directory and restart the Flink cluster. This is because CDC connectors no longer contain these drivers.

  2. Write a YAML file for task configuration. The following sample code provides an example of the file mysql-to-maxcompute.yaml for database synchronization:

    ################################################################################
    # Description: Sync MySQL all tables to MaxCompute
    ################################################################################
    source:
      type: mysql
      hostname: localhost
      port: 3306
      username: root
      password: 123456
      tables: app_db.\.*
      server-id: 5400-5404
      server-time-zone: UTC
    
    # Configure the accessId, accessKey, endpoint, and project parameters.
    sink:
       type: maxcompute
       name: MaxComputeSink
       accessId: ${your_accessId}
       accessKey: ${your_accessKey}
       endpoint: ${your_maxcompute_endpoint}
       project: ${your_project}
       bucketsNum: 8
    
    pipeline:
      name: Sync MySQL Database to MaxCompute
      parallelism: 1
    

    Parameters:

  3. Run the following command to submit the task to the Flink cluster that is deployed in standalone mode:

    ./bin/flink-cdc.sh mysql-to-maxcompute.yaml

    After the task is submitted, the following result is returned:

    Pipeline has been submitted to cluster.
    Job ID: f9f9689866946e25bf151ecc179ef46f
    Job Description: Sync MySQL Database to MaxCompute

    In the Flink web UI, a task named Sync MySQL Database to MaxCompute is running.

  4. Execute the following SQL statements in MaxCompute to check whether the orders, shipments, and products tables are created and data can be written to the tables.

    -- Query the orders table.
    read orders;
    
    -- The following result is returned:
    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
    
    -- Query the shipments table.
    read shipments;
    
    -- The following result is returned:
    +------------+------------+
    | id         | city       |
    +------------+------------+
    | 1          | beijing    |
    | 2          | xian       |
    +------------+------------+
    
    -- Query the products table.
    read products;
    
    -- The following result is returned:
    +------------+------------+
    | id         | product    |
    +------------+------------+
    | 3          | Peanut     |
    | 1          | Beer       |
    | 2          | Cap        |
    +------------+------------+

Synchronize changes in real time

In this example, the orders table is used. When data in the source table in the MySQL database is modified, the data in the destination MaxCompute table is also modified in real time.

  1. Run the following command to access the MySQL container:

    docker-compose exec mysql mysql -uroot -p123456
  2. Insert a data record into the orders table of MySQL.

    INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

    Run the read orders; command in MaxCompute to query data in the orders table. The following result is returned:

    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 3          | 100        |
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
  3. Add a field to the orders table of MySQL.

    ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

    Run the read orders; command in MaxCompute to query data in the orders table. The following result is returned:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 4          | NULL       |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  4. Update a data record in the orders table of MySQL.

    UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

    Run the read orders; command in MaxCompute to query data in the orders table. The following result is returned:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  5. Delete a data record from the orders table of MySQL.

    DELETE FROM app_db.orders WHERE id=2;

    Run the read orders; command in MaxCompute to query data in the orders table. The following result is returned:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    +------------+------------+------------+

Each time you perform an operation in MySQL, a data preview is performed in MaxCompute. The data displayed in the orders table in MaxCompute is updated in real time.

Route the changes

Flink CDC provides the feature to route the schema or data of a source table to other table names. You can use this feature to perform operations such as table or database name replacement, and database synchronization. The following sample code provides an example.

################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
   type: mysql
   hostname: localhost
   port: 3306
   username: root
   password: 123456
   tables: app_db.\.*
   server-id: 5400-5404
   server-time-zone: UTC

# Configure the accessId, accessKey, endpoint, and project parameters.
sink:
   type: maxcompute
   name: MaxComputeSink
   accessId: ${your_accessId}
   accessKey: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}
   bucketsNum: 8

route:
   - source-table: app_db.orders
     sink-table: ods_db.ods_orders
   - source-table: app_db.shipments
     sink-table: ods_db.ods_shipments
   - source-table: app_db.products
     sink-table: ods_db.ods_products

pipeline:
   name: Sync MySQL Database to MaxCompute
   parallelism: 1

For more information about the parameters in the route section, see Route.

The preceding configuration in the route section is used to synchronize the schema and data of the app_db.orders table to the ods_db.ods_orders table. This way, database migration is implemented. You can use regular expressions to match multiple tables in source-table to synchronize data from multiple tables in a sharded database. Sample code:

route:
  - source-table: app_db.order\.*
    sink-table: ods_db.ods_orders

In this case, data of tables, such as app_db.order01, app_db.order02, and app_db.order03, can be aggregated into the ods_db.ods_orders table.

Note

Scenarios in which duplicate primary key values exist in multiple tables are not supported and will be supported in later connector versions.

Clear the environments

After you perform the preceding operations, you must clear the environments.

  1. Run the following command in the directory in which the docker-compose.yml file is stored to stop all containers:

    docker-compose down
  2. Run the following command in the flink-1.18.0 directory in which Flink resides to stop the Flink cluster:

    ./bin/stop-cluster.sh

Appendixes

Configuration items of the connector

Parameter

Required

Default value

Data type

Description

type

Yes

none

String

The connector that you want to use. Set the value to maxcompute.

name

No

none

String

The name of the sink.

accessId

Yes

none

String

The AccessKey ID of your Alibaba Cloud account or RAM user. You can obtain the AccessKey ID from the AccessKey Pair page.

accessKey

Yes

none

String

The AccessKey secret that corresponds to the AccessKey ID.

endpoint

Yes

none

String

The endpoint of MaxCompute. You must configure this parameter based on the region and network connection method that you selected when you create the MaxCompute project. For more information about the endpoints that are used in different regions and different network connection modes, see Endpoints.

project

Yes

none

String

The name of the MaxCompute project. To obtain the name of the MaxCompute project, perform the following steps: Log on to the MaxCompute console. In the left-side navigation pane, choose Workspace > Projects to view the name of the MaxCompute project.

tunnelEndpoint

No

none

String

The endpoint of the MaxCompute Tunnel service. In most cases, the endpoint supports automatic routing based on the region in which the project resides. The endpoint is used only in a special network environment, such as a network environment in which a proxy is used.

quotaName

No

none

String

The name of the exclusive resource group for MaxCompute Tunnel. If this parameter is not configured, a shared resource group is used.

stsToken

No

none

String

This parameter is required if you use a Security Token Service (STS) token issued by a RAM role for authentication.

bucketsNum

No

16

Integer

The number of buckets that are used to automatically create a MaxCompute Delta table. For more information, see Near-real-time data warehouse overview.

compressAlgorithm

No

zlib

String

The compression algorithm that is used when data is written to MaxCompute. Valid values: raw, zlib, and snappy. The value raw indicates that data is not compressed.

totalBatchSize

No

64MB

String

The maximum amount of data in the buffer of a partition or a non-partitioned table. The buffers of different partitions or non-partitioned tables are independent of each other. When data in the buffer exceeds the value of this parameter, data is written to MaxCompute.

bucketBatchSize

No

4MB

String

The maximum amount of data in the buffer of a bucket. This parameter takes effect only if you write data to Delta tables. The buffers of different buckets are independent of each other. When data in the buffer exceeds the value of this parameter, data is written to MaxCompute.

numCommitThreads

No

16

Integer

The maximum number of partitions or tables that can be processed at the same time during the checkpointing operation.

numFlushConcurrent

No

4

Integer

The maximum number of buckets that can be written to MaxCompute at the same time. This parameter takes effect only if you write data to Delta tables.

retryTimes

No

3

Integer

The maximum number of retries when a network connection error occurs.

sleepMillis

No

true

Long

The waiting duration for each retry when a network connection error occurs. Unit: milliseconds.

Table location mappings

When the Flink CDC connector automatically creates a table, the location information of the source table is mapped to the MaxCompute table based on the mappings described in the following table.

Important

If a MaxCompute project does not support schema models, each synchronization task can synchronize data of only one MySQL database. For other data sources, the Flink CDC connector ignores the tableId.namespace information.

Object in Flink CDC

Location in MaxCompute

Location in MySQL

Project in the configuration file

Project

none

TableId.namespace

Schema (This configuration is supported only when the MaxCompute project supports the schema model. If the MaxCompute project does not support the schema model, this configuration is ignored.)

Database

TableId.tableName

Table

Table

Data type mappings

Flink data type

MaxCompute data type

CHAR/VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT