MaxCompute為您提供對接Flink CDC的新版外掛程式Connector連接器。您可以通過對接Flink CDC,將資料來源(例如MySQL)資料即時同步至MaxCompute的目標表(普通表或Delta表)。本文為您介紹MaxCompute新版外掛程式的能力支援情況與主要操作流程。
Flink CDC背景介紹
Flink CDC是一個端到端的開源即時資料整合工具,定義了一套功能完整的編程介面和ETL資料處理架構,使用者可通過提交Flink作業使用其功能,詳情請參見Flink CDC。Flink CDC深度整合並由Apache Flink驅動,提供以下核心功能:
端到端的Data Integration架構。
為Data Integration的使用者提供了易於構建作業的API。
支援在Source(資料來源)和Sink(輸出端)中處理多個表。
整庫同步。
具備表結構變更自動同步的能力(Schema Evolution)。
前提條件
已建立MaxCompute專案,詳情請參見建立MaxCompute專案。
注意事項
快速開始
本文將基於Flink CDC,快速構建MySQL到MaxCompute的Streaming ETL作業(MySQL to MaxCompute),實現Flink CDC Pipeline的編寫。其中包含整庫同步、表結構變更同步和分庫分表同步功能。
環境準備
準備Flink Standalone叢集
下載flink-1.18.0-bin-scala_2.12.tgz並解壓,解壓後得到
flink-1.18.0目錄。進入flink-1.18.0目錄,執行以下命令,將FLINK_HOME設定為flink-1.18.0的安裝目錄。export FLINK_HOME=$(pwd)在
$flink-1.18.0/conf目錄下執行vim flink-conf.yaml命令,在設定檔中追加下列參數並儲存。# 開啟checkpoint,每隔3秒做一次checkpoint # 僅作測試使用,實際作業checkpoint間隔時間不建議低於30s execution.checkpointing.interval: 3000 # 由於flink-cdc-pipeline-connector-maxcompute依賴flink通訊機制進行寫入同步, # 這裡適當增大訊息通訊逾時時間 pekko.ask.timeout: 60s執行如下命令,啟動Flink叢集。
./bin/start-cluster.sh如啟動成功,可以在http://localhost:8081/(8081為預設連接埠)訪問到Flink Web UI。
多次執行start-cluster.sh可以拉起多個TaskManager,用於並發執行。
準備MySQL環境
此處以Docker Compose的方式為例指導您準備MySQL環境。
啟動Docker鏡像後,建立一個名為
docker-compose.yaml的檔案,檔案內容如下:version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw參數說明:
參數
描述
version
Docker版本。
image
鏡像版本,配置為debezium/example-mysql:1.1。
ports
MySQL連接埠號碼。
environment
MySQL帳號密碼。
該Docker Compose中包含的容器有:MySQL-包含商品資訊的資料庫app_db。
在docker-compose.yaml所在目錄執行如下命令,啟動所需組件:
docker-compose up -d該命令將以Detached模式自動啟動Docker Compose配置中定義的所有容器。您可以執行
docker ps命令查看上述容器是否已正常啟動。
在MySQL資料庫中準備資料
執行如下命令,進入MySQL容器。
docker-compose exec mysql mysql -uroot -p123456在MySQL中建立資料庫,並準備表資料。
建立資料庫。
CREATE DATABASE app_db; USE app_db;準備表資料。
建立orders表,並插入資料。
CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- 插入資料 INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);建立shipments表,並插入資料。
CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- 插入資料 INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');建立products表,並插入資料。
-- CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- 插入資料 INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
通過Flink CDC CLI提交任務
下載所需JAR包:
flink-cdc包
進入flink-cdc下載二進位壓縮包flink-cdc-3.1.1-bin.tar.gz,並解壓得到
flink-cdc-3.1.1目錄,其中會包含bin、lib、log及conf四個目錄,將這四個目錄下的檔案移動至flink-1.18.0對應的目錄下。Connector包
下載以下Connector包,並移動至
flink-1.18.0/lib目錄下。說明下載連結只對發行的版本有效, SNAPSHOT版本需要本地基於master或release-分支編譯。
Driver包
下載MySQL Connector Java包,通過--jar參數將其傳入Flink CDC CLI,或將其放在
$flink-1.18.0/lib目錄下並重啟Flink叢集,因為CDC Connectors不再包含這些Drivers。
編寫任務配置YAML檔案。下述為您提供一個整庫同步的樣本檔案
mysql-to-maxcompute.yaml:################################################################################ # Description: Sync MySQL all tables to MaxCompute ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* server-id: 5400-5404 server-time-zone: UTC # accessId, accessKey, endpoint, project需要使用者自行填寫 sink: type: maxcompute name: MaxComputeSink accessId: ${your_accessId} accessKey: ${your_accessKey} endpoint: ${your_maxcompute_endpoint} project: ${your_project} bucketsNum: 8 pipeline: name: Sync MySQL Database to MaxCompute parallelism: 1參數說明:
Source部分的參數配置詳情請參見Apache Flink CDC MySQL Connector。
Sink部分的參數配置方式請參見連接器Connector配置項。
執行下述命令,提交任務至Flink Standalone叢集。
./bin/flink-cdc.sh mysql-to-maxcompute.yaml提交成功後,返回如下資訊:
Pipeline has been submitted to cluster. Job ID: f9f9689866946e25bf151ecc179ef46f Job Description: Sync MySQL Database to MaxCompute在Flink Web UI中,即可看到一個名為
Sync MySQL Database to MaxCompute的任務正在運行。在MaxCompute中執行如下SQL,查看orders、shipments及products三張表是否已被成功建立,並且可以進行資料寫入。
-- 查看orders表 read orders; -- 返回結果: +------------+------------+ | id | price | +------------+------------+ | 1 | 4 | | 2 | 100 | +------------+------------+ -- 查看shipments表 read shipments; -- 返回結果 +------------+------------+ | id | city | +------------+------------+ | 1 | beijing | | 2 | xian | +------------+------------+ -- 查看products表 read products; -- 返回結果 +------------+------------+ | id | product | +------------+------------+ | 3 | Peanut | | 1 | Beer | | 2 | Cap | +------------+------------+
同步變更操作
此處以orders表為例,為您展示在修改MySQL資料庫中的源表資料時,MaxCompute中對應的目標表資料也會即時更新。
執行如下命令,進入MySQL容器。
docker-compose exec mysql mysql -uroot -p123456在MySQL的orders表中插入一條資料。
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);在MaxCompute中執行
read orders;命令查詢orders表資料。返回結果如下:+------------+------------+ | id | price | +------------+------------+ | 3 | 100 | | 1 | 4 | | 2 | 100 | +------------+------------+在MySQL的orders表中增加一個欄位。
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;在MaxCompute中執行
read orders;命令查詢orders表資料。返回結果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 4 | NULL | | 2 | 100 | NULL | +------------+------------+------------+在MySQL的orders表中更新一條資料。
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;在MaxCompute中執行
read orders;命令查詢orders表資料。返回結果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | | 2 | 100 | NULL | +------------+------------+------------+在MySQL的orders表中刪除一條資料。
DELETE FROM app_db.orders WHERE id=2;在MaxCompute中執行
read orders;命令查詢orders表資料。返回結果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | +------------+------------+------------+
對於上述操作,在MySQL中每執行一步,就在MaxCompute中進行一次資料預覽,可以看到MaxCompute中顯示的orders表資料是即時更新的。
輪詢變更操作
Flink CDC提供了將源表的表結構或資料路由到其他表名的配置,藉助這種能力,我們能夠實現表名、庫名替換,整庫同步等功能。 下面提供一個設定檔說明:
################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
# accessId, accessKey, endpoint, project 需要使用者自行填寫
sink:
type: maxcompute
name: MaxComputeSink
accessId: ${your_accessId}
accessKey: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
bucketsNum: 8
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to MaxCompute
parallelism: 1route部分的參數詳情請參見Flink CDC Route。
通過上面的route配置,會將app_db.orders表的結構和資料同步至ods_db.ods_orders中。從而實現資料庫遷移的功能。 特別地,source-table支援Regex匹配多表,從而實現分庫分表同步的功能,例如下面的配置:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_orders這樣,就可以將諸如app_db.order01、app_db.order02、app_db.order03的表資料匯總到ods_db.ods_orders中。
目前還不支援多表中存在相同主鍵資料的情境,將在後續版本支援。
環境清理
執行完上述操作後,您需要進行環境清理。
在docker-compose.yml檔案所在的目錄下執行如下命令停止所有容器:
docker-compose down在Flink所在目錄flink-1.18.0下,執行如下命令停止Flink叢集:
./bin/stop-cluster.sh
附錄
連接器Connector配置項
配置項 | 是否必填 | 預設值 | 類型 | 描述 |
type | 是 | none | String | 指定要使用的連接器,這裡需要設定成 |
name | 否 | none | String | Sink的名稱。 |
accessId | 是 | none | String | 阿里雲帳號或RAM使用者的AccessKey ID。您可以進入AccessKey管理頁面擷取AccessKey ID。 |
accessKey | 是 | none | String | AccessKey ID對應的AccessKey Secret。 |
endpoint | 是 | none | String | MaxCompute服務的串連地址。您需要根據建立MaxCompute專案時選擇的地區以及網路連接方式配置Endpoint。各地區及網路對應的Endpoint值,請參見 Endpoint。 |
project | 是 | none | String | MaxCompute專案名稱。您可以登入MaxCompute控制台,在工作區>專案管理頁面擷取MaxCompute專案名稱。 |
tunnelEndpoint | 否 | none | String | MaxCompute Tunnel服務的串連地址,通常這項配置可以根據指定的專案所在的地區進行自動路由。僅在使用代理等特殊網路環境下使用該配置。 |
quotaName | 否 | none | String | MaxCompute資料轉送使用的獨享資源群組名稱,如不指定該配置,則使用共用資源組。詳情可以參見購買與使用獨享Data Transmission Service資源群組。 |
stsToken | 否 | none | String | 當使用RAM角色頒發的短時有效存取權杖(STS Token)進行鑒權時,需要指定該參數。 |
bucketsNum | 否 | 16 | Integer | 自動建立MaxCompute Delta表時使用的桶數。使用方式請參見近即時數倉概述。 |
compressAlgorithm | 否 | zlib | String | 寫入MaxCompute時使用的資料壓縮演算法,當前支援 |
totalBatchSize | 否 | 64MB | String | 記憶體中緩衝的資料量大小,單位為分區級(非分區表單位為表級),不同分區(表)的緩衝區相互獨立,達到閾值後資料寫入到MaxCompute。 |
bucketBatchSize | 否 | 4MB | String | 記憶體中緩衝的資料量大小,單位為桶級,僅寫入Delta表時生效。不同資料桶的緩衝區相互獨立,達到閾值後將該桶資料寫入到MaxCompute。 |
numCommitThreads | 否 | 16 | Integer | Checkpoint階段,能夠同時處理的分區(表)數量。 |
numFlushConcurrent | 否 | 4 | Integer | 寫入資料到MaxCompute時,能夠同時寫入的桶數量。僅寫入Delta表時生效。 |
retryTimes | 否 | 3 | Integer | 當網路連結發生錯誤時,進行重試的次數。 |
sleepMillis | 否 | true | Long | 當網路連結發生錯誤時,每次重試等待的時間,單位:毫秒。 |
表位置映射
連接器Connector自動建表時,使用如下映射關係,將源表的位置資訊映射到MaxCompute表中。
當MaxCompute專案不支援Schema模型時,每個同步任務僅能同步一個MySQL Database。(其他資料來源同理,連接器Connector會忽略tableId.namespace資訊)。
Flink CDC中對象 | MaxCompute位置 | MySQL位置 |
設定檔中project | Project | none |
TableId.namespace | Schema(僅當MaxCompute專案支援Schema模型時,如不支援,將忽略該配置) | Database |
TableId.tableName | Table | Table |
資料類型映射
Flink Type | MaxCompute Type |
CHAR/VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |