Realtime Compute for Apache Flink通過訂閱AnalyticDB for MySQL,可以即時捕獲和處理資料庫變更資料,實現高效的資料同步和流式計算。本文為您介紹如何使用Flink訂閱AnalyticDB for MySQL Binlog。
前提條件
AnalyticDB for MySQL產品系列為企業版、基礎版、湖倉版和數倉版彈性模式。
AnalyticDB for MySQL叢集的核心版本需為3.2.1.0及以上版本。
說明請在雲原生資料倉儲AnalyticDB MySQL控制台集群信息頁面,配寘資訊地區,查看和升級核心版本。
FlinkRealtime Compute引擎需為VVR 8.0.4及以上版本。
AnalyticDB for MySQL叢集和Flink全託管工作空間需要位於同一VPC下。
使用限制
XUANWU_V2表不支援開啟Binlog,因此不能通過訂閱Binlog實現AnalyticDB for MySQL叢集XUANWU_V2表的資料同步和流式計算。
Flink僅支援處理AnalyticDB for MySQL Binlog中的所有基礎資料類型和複雜資料類型JSON。
Flink不會處理AnalyticDB for MySQL Binlog中的DDL操作記錄和分區表自動分區刪除的操作記錄。
步驟一:開啟Binlog功能
開啟Binlog功能,本文以表名為source_table為例。
說明AnalyticDB for MySQL僅支援按表開啟Binlog功能。
建表時,開啟Binlog
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;建表後,開啟Binlog
ALTER TABLE source_table BINLOG=true;(可選)修改Binlog保留時間長度。
您可以通過修改
binlog_ttl參數來調整Binlog的保留時間長度,參數預設值為6h。以下樣本表示將表source_table的Binlog保留時間長度設定為1天。ALTER TABLE source_table binlog_ttl='1d';binlog_ttl參數取值支援以下格式:毫秒:純數字。樣本:
60代表60毫秒。秒:數字+s。樣本:
30s代表30秒。小時:數字+h。樣本:
2h代表2小時。天:數字+d。樣本:
1d代表1天。
說明核心版本為3.2.1且為3.2.1.9及以上、3.2.2且為3.2.2.14及以上、3.2.3且為3.2.3.8及以上、3.2.4且為3.2.4.4及以上、3.2.5且為3.2.5.1及以上的叢集,Binlog保留時間長度上限為365天。核心版本低於上述的叢集,Binlog保留時間長度上限為21天。
建議您設定的Binlog保留時間不小於
binlog_ttl參數的預設值。若設定的保留時間過短,可能會導致檔案被清理,影響資料同步。如果您需要查看當前Binlog保留時間長度,執行語句
SHOW CREATE TABLE source_table;。
步驟二:上傳AnalyticDB for MySQL連接器到Flink
下載連接器。
在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊資料連線。
在資料連線頁面,單擊建立自訂連接器。
上傳步驟1下載的連接器。上傳完成後,單擊下一步。
單擊完成。建立完成的自訂連接器會出現在連接器列表中。
步驟三:訂閱Binlog
建立源表,串連到AnalyticDB for MySQL並讀取指定表(source_table)的Binlog資料。
說明Flink DDL中定義的主鍵必須和AnalyticDB for MySQL叢集物理表中的主鍵保持一致,主鍵一致包括主鍵和主鍵名稱一致。如果不一致,會影響資料正確性。
Flink的資料類型需要和AnalyticDB for MySQL相容。映射關係,請參見類型映射。
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );WITH參數說明:
參數
是否必填
預設值
資料類型
說明
connector
是
無
STRING
使用的連接器。
這裡填寫自訂連接器,固定填寫
adb-mysql-cdc。hostname
是
無
STRING
AnalyticDB for MySQL的VPC地址。
username
是
無
STRING
AnalyticDB for MySQL資料庫帳號。
password
是
無
STRING
AnalyticDB for MySQL資料庫密碼。
database-name
是
無
STRING
AnalyticDB for MySQL資料庫名稱。
由於AnalyticDB for MySQL實現的是表級Binlog,此處僅支援設定一個資料庫。
table-name
是
無
STRING
AnalyticDB for MySQL資料庫的表名。
由於AnalyticDB for MySQL實現的是表級Binlog,此處僅支援設定一個表。
port
否
3306
INTEGER
連接埠號碼。
scan.incremental.snapshot.enabled
否
true
BOOLEAN
增量快照。
預設開啟。增量快照是一種讀取錶快照的新機制,與舊的快照機制相比,增量快照有許多優點,包括:
在讀取快照期間,Source支援並發讀取。
在讀取快照期間,Source支援進行Chunk粒度的Checkpoint。
在讀取快照之前,Source不需要擷取資料庫鎖許可權。
scan.incremental.snapshot.chunk.size
否
8096
INTEGER
錶快照的Chunk大小(包含的行數)。
當開啟增量快照讀取時,表會被切分成多個Chunk讀取。
scan.snapshot.fetch.size
否
1024
INTEGER
讀取錶快照時,每次讀取資料的最大行數。
scan.startup.mode
否
initial
STRING
消費資料的啟動模式。
取值如下:
initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。
earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。
specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置
scan.startup.specific-offset.file和scan.startup.specific-offset.pos參數來指定從特定Binlog檔案名稱和位移量啟動。latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從Binlog的末尾(最新的Binlog處)開始讀取,即唯讀取該連接器啟動以後的最新變更。
timestamp:不掃描歷史全量資料,從指定的時間戳記開始讀取Binlog。時間戳記通過
scan.startup.timestamp-millis指定,單位為毫秒(ms)。
重要使用earliest-offset、specific-offset或timestamp啟動模式時,請確保在指定的Binlog消費位置到作業啟動期間,對應表的結構保持不變,避免因表結構變更導致作業執行失敗。
scan.startup.specific-offset.file
否
無
STRING
在specific-offset啟動模式下,啟動位點的Binlog檔案名稱。
最新Binlog檔案名稱可使用
SHOW MASTER STATUS for table_name;擷取。scan.startup.specific-offset.pos
否
無
LONG
在specific-offset啟動模式下,啟動位點的Binlog檔案位置。
最新Binlog位置可使用
SHOW MASTER STATUS for table_name;擷取。scan.startup.specific-offset.skip-events
否
無
LONG
在指定的啟動位點後需要跳過的事件數目量。
scan.startup.specific-offset.skip-rows
否
無
LONG
在指定的啟動位點後需要跳過的資料行數。
scan.startup.timestamp-millis
否
無
LONG
使用指定時間模式啟動時,啟動位點的毫秒時間戳記。
使用該配置時,
scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒(ms)。server-time-zone
否
無
STRING
資料庫伺服器中的會話時區。
例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP類型如何轉成STRING類型。如果沒有設定,則使用
ZONELD.SYSTEMDEFAULT()來確定伺服器時區。debezium.min.row.count.to.stream.result
否
1000
INTEGER
當表的行數大於該值時,連接器會對結果進行串流。
若將此參數設定為
0,會跳過所有表大小檢查,始終在快照期間對所有結果進行串流。connect.timeout
否
30s
DURATION
串連資料庫伺服器逾時,重試串連之前等待逾時的最長時間。
預設單位為秒(s)。
connect.max-retries
否
3
INTEGER
串連資料庫服務時,串連失敗後重試的最大次數。
在目標端建立表,用於儲存處理後的資料。本文以AnalyticDB for MySQL作為目標端。Flink支援的連接器請參見支援的連接器。
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )建立結果表,串連步驟3建立的表,用於將處理後的資料寫入到AnalyticDB for MySQL指定的表。
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );結果表對應的WITH參數和映射類型的詳情,請參見:雲原生資料倉儲AnalyticDB MySQL版(ADB)3.0。
將捕獲到的來源資料變化同步到結果表,並由結果表將資料同步到目標端。
INSERT INTO adb_sink SELECT * FROM adb_source;單擊儲存。
單擊深度檢查。
深度檢查能夠檢查作業的SQL語義、網路連通性以及作業使用的表的中繼資料資訊。同時,您可以單擊結果地區的SQL最佳化,展開查看SQL風險問題提示以及對應的SQL最佳化建議。
(可選)單擊調試。
您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險。
單擊部署。
(可選)查看Binlog資訊。
說明使用以下語句查看Binlog日誌資訊時,若僅開啟Binlog功能,日誌資訊顯示為0。只有成功訂閱Binlog後,才會顯示日誌資訊。
若您需要擷取Binlog最新寫入的檔案名稱和位置資訊,請執行以下SQL語句:
SHOW MASTER STATUS FOR source_table;若您需要瞭解所有未清理的歷史Binlog檔案及其大小,請執行以下SQL語句:
SHOW BINARY LOGS FOR source_table;
類型映射
AnalyticDB for MySQL與Flink的資料類型映射關係如下:
AnalyticDB for MySQL欄位類型 | Flink欄位類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p,s)或NUMERIC(p,s) | DECIMAL(p,s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |