全部產品
Search
文件中心

MaxCompute:開源Flink CDC近即時寫入Delta Table

更新時間:Mar 29, 2025

MaxCompute為您提供對接Flink CDC的新版外掛程式Connector連接器。您可以通過對接Flink CDC,將資料來源(例如MySQL)資料即時同步至MaxCompute的目標表(普通表或Delta表)。本文為您介紹MaxCompute新版外掛程式的能力支援情況與主要操作流程。

Flink CDC背景介紹

Flink CDC是一個端到端的開源即時資料整合工具,定義了一套功能完整的編程介面和ETL資料處理架構,使用者可通過提交Flink作業使用其功能,詳情請參見Flink CDC。Flink CDC深度整合並由Apache Flink驅動,提供以下核心功能:

  • 端到端的Data Integration架構。

  • 為Data Integration的使用者提供了易於構建作業的API。

  • 支援在Source(資料來源)和Sink(輸出端)中處理多個表。

  • 整庫同步。

  • 具備表結構變更自動同步的能力(Schema Evolution)。

前提條件

已建立MaxCompute專案,詳情請參見建立MaxCompute專案

注意事項

  • 資料同步Connector連接器支援自動建表,將MaxCompute表與源表的位置關係、資料類型進行自動對應。當源表有主鍵時,會自動建立Delta表,否則會建立MaxCompute普通表。映射詳情請參見表位置映射資料類型映射

  • 當資料寫入至普通表時,系統會忽略DELETE操作,UPDATE操作會被視為INSERT操作。

  • 目前僅支援at-least-once,Delta表由於主鍵特效能夠實現等冪寫。

  • 對於表結構變更同步。

  • 新增列只能添加到最後一列。

  • 修改列類型,只能修改為相容的資料類型。資料類型相容表詳情請參見更改列資料類型

快速開始

本文將基於Flink CDC,快速構建MySQL到MaxCompute的Streaming ETL作業(MySQL to MaxCompute),實現Flink CDC Pipeline的編寫。其中包含整庫同步、表結構變更同步和分庫分表同步功能。

環境準備

準備Flink Standalone叢集

  1. 下載flink-1.18.0-bin-scala_2.12.tgz並解壓,解壓後得到flink-1.18.0目錄。進入flink-1.18.0目錄,執行以下命令,將FLINK_HOME設定為flink-1.18.0的安裝目錄。

    export FLINK_HOME=$(pwd)
  2. $flink-1.18.0/conf目錄下執行vim flink-conf.yaml命令,在設定檔中追加下列參數並儲存。

    # 開啟checkpoint,每隔3秒做一次checkpoint
    # 僅作測試使用,實際作業checkpoint間隔時間不建議低於30s
    execution.checkpointing.interval: 3000
    
    # 由於flink-cdc-pipeline-connector-maxcompute依賴flink通訊機制進行寫入同步,
    # 這裡適當增大訊息通訊逾時時間
    pekko.ask.timeout: 60s
  3. 執行如下命令,啟動Flink叢集。

    ./bin/start-cluster.sh

    如啟動成功,可以在http://localhost:8081/(8081為預設連接埠)訪問到Flink Web UI。

    多次執行start-cluster.sh可以拉起多個TaskManager,用於並發執行。

準備MySQL環境

此處以Docker Compose的方式為例指導您準備MySQL環境。

  1. 啟動Docker鏡像後,建立一個名為docker-compose.yaml的檔案,檔案內容如下:

    version: '2.1'
    services:
      mysql:
        image: debezium/example-mysql:1.1
        ports:
          - "3306:3306"
        environment:
          - MYSQL_ROOT_PASSWORD=123456
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw

    參數說明:

    參數

    描述

    version

    Docker版本。

    image

    鏡像版本,配置為debezium/example-mysql:1.1。

    ports

    MySQL連接埠號碼。

    environment

    MySQL帳號密碼。

    該Docker Compose中包含的容器有:MySQL-包含商品資訊的資料庫app_db。

  2. 在docker-compose.yaml所在目錄執行如下命令,啟動所需組件:

    docker-compose up -d

    該命令將以Detached模式自動啟動Docker Compose配置中定義的所有容器。您可以執行docker ps命令查看上述容器是否已正常啟動。

在MySQL資料庫中準備資料

  1. 執行如下命令,進入MySQL容器。

    docker-compose exec mysql mysql -uroot -p123456
  2. 在MySQL中建立資料庫,並準備表資料。

    1. 建立資料庫。

      CREATE DATABASE app_db;
      USE app_db;
    2. 準備表資料。

      • 建立orders表,並插入資料。

        CREATE TABLE `orders` (
        `id` INT NOT NULL,
        `price` DECIMAL(10,2) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入資料
        INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
        INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
      • 建立shipments表,並插入資料。

        CREATE TABLE `shipments` (
        `id` INT NOT NULL,
        `city` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入資料
        INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
        INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
      • 建立products表,並插入資料。

        -- 
        CREATE TABLE `products` (
        `id` INT NOT NULL,
        `product` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入資料
        INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
        INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
        INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

通過Flink CDC CLI提交任務

  1. 下載所需JAR包:

    • flink-cdc包

      進入flink-cdc下載二進位壓縮包flink-cdc-3.1.1-bin.tar.gz,並解壓得到flink-cdc-3.1.1目錄,其中會包含bin、lib、log及conf四個目錄,將這四個目錄下的檔案移動至flink-1.18.0對應的目錄下。

    • Connector包

      下載以下Connector包,並移動至flink-1.18.0/lib目錄下。

      說明

      下載連結只對發行的版本有效, SNAPSHOT版本需要本地基於master或release-分支編譯。

    • Driver包

      下載MySQL Connector Java包,通過--jar參數將其傳入Flink CDC CLI,或將其放在$flink-1.18.0/lib目錄下並重啟Flink叢集,因為CDC Connectors不再包含這些Drivers。

  2. 編寫任務配置YAML檔案。下述為您提供一個整庫同步的樣本檔案mysql-to-maxcompute.yaml

    ################################################################################
    # Description: Sync MySQL all tables to MaxCompute
    ################################################################################
    source:
      type: mysql
      hostname: localhost
      port: 3306
      username: root
      password: 123456
      tables: app_db.\.*
      server-id: 5400-5404
      server-time-zone: UTC
    
    # accessId, accessKey, endpoint, project需要使用者自行填寫
    sink:
       type: maxcompute
       name: MaxComputeSink
       accessId: ${your_accessId}
       accessKey: ${your_accessKey}
       endpoint: ${your_maxcompute_endpoint}
       project: ${your_project}
       bucketsNum: 8
    
    pipeline:
      name: Sync MySQL Database to MaxCompute
      parallelism: 1
    

    參數說明:

  3. 執行下述命令,提交任務至Flink Standalone叢集。

    ./bin/flink-cdc.sh mysql-to-maxcompute.yaml

    提交成功後,返回如下資訊:

    Pipeline has been submitted to cluster.
    Job ID: f9f9689866946e25bf151ecc179ef46f
    Job Description: Sync MySQL Database to MaxCompute

    在Flink Web UI中,即可看到一個名為Sync MySQL Database to MaxCompute的任務正在運行。

  4. 在MaxCompute中執行如下SQL,查看orders、shipments及products三張表是否已被成功建立,並且可以進行資料寫入。

    -- 查看orders表
    read orders;
    
    -- 返回結果:
    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
    
    -- 查看shipments表
    read shipments;
    
    -- 返回結果
    +------------+------------+
    | id         | city       |
    +------------+------------+
    | 1          | beijing    |
    | 2          | xian       |
    +------------+------------+
    
    -- 查看products表
    read products;
    
    -- 返回結果
    +------------+------------+
    | id         | product    |
    +------------+------------+
    | 3          | Peanut     |
    | 1          | Beer       |
    | 2          | Cap        |
    +------------+------------+

同步變更操作

此處以orders表為例,為您展示在修改MySQL資料庫中的源表資料時,MaxCompute中對應的目標表資料也會即時更新。

  1. 執行如下命令,進入MySQL容器。

    docker-compose exec mysql mysql -uroot -p123456
  2. 在MySQL的orders表中插入一條資料。

    INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

    在MaxCompute中執行read orders;命令查詢orders表資料。返回結果如下:

    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 3          | 100        |
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
  3. 在MySQL的orders表中增加一個欄位。

    ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

    在MaxCompute中執行read orders;命令查詢orders表資料。返回結果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 4          | NULL       |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  4. 在MySQL的orders表中更新一條資料。

    UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

    在MaxCompute中執行read orders;命令查詢orders表資料。返回結果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  5. 在MySQL的orders表中刪除一條資料。

    DELETE FROM app_db.orders WHERE id=2;

    在MaxCompute中執行read orders;命令查詢orders表資料。返回結果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    +------------+------------+------------+

對於上述操作,在MySQL中每執行一步,就在MaxCompute中進行一次資料預覽,可以看到MaxCompute中顯示的orders表資料是即時更新的。

輪詢變更操作

Flink CDC提供了將源表的表結構或資料路由到其他表名的配置,藉助這種能力,我們能夠實現表名、庫名替換,整庫同步等功能。 下面提供一個設定檔說明:

################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
   type: mysql
   hostname: localhost
   port: 3306
   username: root
   password: 123456
   tables: app_db.\.*
   server-id: 5400-5404
   server-time-zone: UTC

# accessId, accessKey, endpoint, project 需要使用者自行填寫
sink:
   type: maxcompute
   name: MaxComputeSink
   accessId: ${your_accessId}
   accessKey: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}
   bucketsNum: 8

route:
   - source-table: app_db.orders
     sink-table: ods_db.ods_orders
   - source-table: app_db.shipments
     sink-table: ods_db.ods_shipments
   - source-table: app_db.products
     sink-table: ods_db.ods_products

pipeline:
   name: Sync MySQL Database to MaxCompute
   parallelism: 1

route部分的參數詳情請參見Flink CDC Route

通過上面的route配置,會將app_db.orders表的結構和資料同步至ods_db.ods_orders中。從而實現資料庫遷移的功能。 特別地,source-table支援Regex匹配多表,從而實現分庫分表同步的功能,例如下面的配置:

route:
  - source-table: app_db.order\.*
    sink-table: ods_db.ods_orders

這樣,就可以將諸如app_db.order01、app_db.order02、app_db.order03的表資料匯總到ods_db.ods_orders中。

說明

目前還不支援多表中存在相同主鍵資料的情境,將在後續版本支援。

環境清理

執行完上述操作後,您需要進行環境清理。

  1. 在docker-compose.yml檔案所在的目錄下執行如下命令停止所有容器:

    docker-compose down
  2. 在Flink所在目錄flink-1.18.0下,執行如下命令停止Flink叢集:

    ./bin/stop-cluster.sh

附錄

連接器Connector配置項

配置項

是否必填

預設值

類型

描述

type

none

String

指定要使用的連接器,這裡需要設定成 maxcompute

name

none

String

Sink的名稱。

accessId

none

String

阿里雲帳號或RAM使用者的AccessKey ID。您可以進入AccessKey管理頁面擷取AccessKey ID。

accessKey

none

String

AccessKey ID對應的AccessKey Secret。

endpoint

none

String

MaxCompute服務的串連地址。您需要根據建立MaxCompute專案時選擇的地區以及網路連接方式配置Endpoint。各地區及網路對應的Endpoint值,請參見 Endpoint

project

none

String

MaxCompute專案名稱。您可以登入MaxCompute控制台,在工作區>專案管理頁面擷取MaxCompute專案名稱。

tunnelEndpoint

none

String

MaxCompute Tunnel服務的串連地址,通常這項配置可以根據指定的專案所在的地區進行自動路由。僅在使用代理等特殊網路環境下使用該配置。

quotaName

none

String

MaxCompute資料轉送使用的獨享資源群組名稱,如不指定該配置,則使用共用資源組。詳情可以參見購買與使用獨享Data Transmission Service資源群組

stsToken

none

String

當使用RAM角色頒發的短時有效存取權杖(STS Token)進行鑒權時,需要指定該參數。

bucketsNum

16

Integer

自動建立MaxCompute Delta表時使用的桶數。使用方式請參見近即時數倉概述

compressAlgorithm

zlib

String

寫入MaxCompute時使用的資料壓縮演算法,當前支援raw(不進行壓縮)、zlibsnappy

totalBatchSize

64MB

String

記憶體中緩衝的資料量大小,單位為分區級(非分區表單位為表級),不同分區(表)的緩衝區相互獨立,達到閾值後資料寫入到MaxCompute。

bucketBatchSize

4MB

String

記憶體中緩衝的資料量大小,單位為桶級,僅寫入Delta表時生效。不同資料桶的緩衝區相互獨立,達到閾值後將該桶資料寫入到MaxCompute。

numCommitThreads

16

Integer

Checkpoint階段,能夠同時處理的分區(表)數量。

numFlushConcurrent

4

Integer

寫入資料到MaxCompute時,能夠同時寫入的桶數量。僅寫入Delta表時生效。

retryTimes

3

Integer

當網路連結發生錯誤時,進行重試的次數。

sleepMillis

true

Long

當網路連結發生錯誤時,每次重試等待的時間,單位:毫秒。

表位置映射

連接器Connector自動建表時,使用如下映射關係,將源表的位置資訊映射到MaxCompute表中。

重要

當MaxCompute專案不支援Schema模型時,每個同步任務僅能同步一個MySQL Database。(其他資料來源同理,連接器Connector會忽略tableId.namespace資訊)。

Flink CDC中對象

MaxCompute位置

MySQL位置

設定檔中project

Project

none

TableId.namespace

Schema(僅當MaxCompute專案支援Schema模型時,如不支援,將忽略該配置)

Database

TableId.tableName

Table

Table

資料類型映射

Flink Type

MaxCompute Type

CHAR/VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT