本文為您介紹MySQL連接器在常見使用情境下的最佳實務。
設定Server ID,避免Binlog消費衝突
每個同步資料庫資料的用戶端,都會有一個唯一ID,即Server ID。如果不同作業使用了相同的Server ID,會因為衝突導致作業報錯。建議為每個MySQL CDC資料來源配置不同的Server ID。
Server ID配置方式
Server ID可以在Flink建表語句中指定,也可以通過動態Hints配置。
建議通過動態Hints來配置Server ID,而不是在建表的WITH參數中配置Server ID。動態Hints詳情請參見動態Hints。
不同情境下Server ID的配置
未開啟增量快照框架或並行度為1
當未開啟增量快照框架或並行度為1時,可以指定一個特定的Server ID。
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;開啟增量快照框架且並行度大於1
當開啟增量快照框架且並行度大於1時,需要指定Server ID範圍,確保範圍內可用的Server ID數量不小於並行度。假設並行度為3,可以如下配置:
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;結合CTAS進行資料同步
當結合CTAS進行資料同步時,如果CDC資料來源配置相同,會自動對資料來源進行合并複用,此時可以為多個CDC資料來源配置相同的Server ID。詳情請參見程式碼範例四:多CTAS語句。
同一作業包含多個MySQL CDC源表(非CTAS)
當作業中包含多個MySQL CDC源表,且不是使用CTAS語句同步時,如果沒有開啟Source複用(詳情請參見開啟Source複用,減少Binlog資料連線),需要為每一個CDC源表提供不同的Server ID。同理,如果開啟增量快照框架且並行度大於1,需要指定Server ID範圍。
select * from source_table1 /*+ OPTIONS('server-id'='123456-123457') */ left join source_table2 /*+ OPTIONS('server-id'='123458-123459') */ on source_table1.id=source_table2.id;
設定分區參數,最佳化記憶體空間
MySQL CDC源表在啟動時掃描全表,將表按照主鍵分成多個分區(chunk),記錄下此時的Binlog位點。並使用增量快照演算法通過Select語句,逐個讀取每個分區的資料。作業會周期性執行Checkpoint,記錄下已經完成的分區。當發生Failover時,只需要繼續讀取未完成的分區。當分區全部讀取完後,會從之前擷取的Binlog位點讀取增量的變更記錄。Flink作業會繼續周期性執行Checkpoint,記錄下Binlog位點,當作業發生Failover,便會從之前記錄的Binlog位點繼續處理,從而實現Exactly Once語義。
更詳細的增量快照演算法,請參見MySQL CDC Connector。
對於只有一個欄位的主鍵表,預設使用該欄位進行分區。對於有聯合主鍵的MySQL物理表,預設使用主鍵裡的第一個欄位進行分區。Flink計算引擎VVR 6.0.7及以上版本支援讀取無主鍵源表,需要設定scan.incremental.snapshot.chunk.key-column指定一個非空類型的欄位進行分區。
分區參數最佳化
分區資料和分區資訊會儲存在記憶體中,在一些情況下,可能會出現OOM的問題。可以根據出現OOM的組件進行參數調整:
JobManager
JobManager儲存所有分區的資訊,如果分區數量過多會出現OOM,需要通過增加scan.incremental.snapshot.chunk.size值來減少分區數。也可以在運行參數配置中設定jobmanager.memory.heap.size以增大JobManager堆記憶體,參見Flink參數配置。
TaskManager
TaskManager讀取每個分區的資料,如果分區裡資料條數過多會出現OOM,需要通過減少scan.incremental.snapshot.chunk.size值來減少分區裡的資料條數。也可以在運行參數配置中調整Task Manager Memory為更大值以增加TaskManager堆記憶體。
在VVR 8.0.8及之前版本,最後一個分區需要讀取的資料量可能比較大,導致TaskManager出現OOM,建議升級到VVR 8.0.9及以上以避免該問題。
對於有聯合主鍵的MySQL CDC源表,會使用主鍵裡的第一個欄位進行分區,如果存在大量的資料在該欄位中為相同欄位值的情況,對應分區的資料會更多,可能會導致TaskManager出現OOM,可以設定scan.incremental.snapshot.chunk.key-column指定主鍵中的其他欄位進行分區劃分。
調節作業配置,加速全量階段讀取
MySQL源表全量階段需要通過JDBC連結讀取快照資料,可以通過以下方式加快全量階段讀取速度。
增加source並發度,加快全量階段讀取速度。
增加scan.incremental.snapshot.chunk.size值來增加單個分區擷取的資料量。
如果下遊結果表存在主鍵且支援等冪寫入,可以開啟scan.incremental.snapshot.backfill.skip跳過backfill部分的binlog讀取,加快全量階段處理速率。
開啟Source複用,減少Binlog資料連線
在作業中包含了多張MySQL源表時,開啟Source複用能夠複用Binlog串連,從而減少資料庫的壓力。該功能僅在Realtime ComputeFlink版本提供,社區版MySQL CDC連接器不支援。
您可以在SQL作業中使用SET命令開啟Source複用功能:
SET 'table.optimizer.source-merge.enabled' = 'true';建議在新建立的作業中就開啟Source複用功能。對已有作業啟用Source複用後,需要無狀態啟動。原因是Source複用會導致作業拓撲改變,從原有作業狀態可能無法啟動或者遺失資料。
開啟Source複用後,具有相同配置參數的MySQL源表會進行合并。如果您的作業中所有源表的配置都相同,作業的Binlog串連數可以按照如下方式計算:
全量讀取階段,Binlog串連數等於Source並發度。
增量讀取階段,Binlog串連數等於1。
VVR 8.0.8及8.0.9版本,在開啟CDC Source複用時,還需要額外設定
SET 'sql-gateway.exec-plan.enabled' = 'false';。在開啟CDC Source複用後,不建議將作業配置項
pipeline.operator-chaining設為false,因為將運算元鏈斷開後,Source發送給下遊運算元的資料會增加序列化和反序列的開銷,當合并的Source越多時,開銷會越大。在Realtime Compute引擎VVR 8.0.7版本,將
pipeline.operator-chaining設為false時會出現序列化的問題。
開啟Binlog解析參數,加速增量資料讀取
MySQL連接器作為源表或資料攝入資料來源使用時,在增量階段會解析Binlog檔案產生各種變更訊息,Binlog檔案使用二進位記錄著所有表的變更,可以通過以下方式加速Binlog檔案解析。
開啟並行解析和解析過濾配置(該功能僅在Realtime ComputeFlink版本提供且僅Flink計算引擎VVR 8.0.7及以上版本支援,社區版MySQL CDC連接器不支援)
開啟配置項
scan.only.deserialize.captured.tables.changelog.enabled:僅對指定表的變更事件進行解析。開啟配置項
scan.parallel-deserialize-changelog.enabled:採用多線程對Binlog檔案進行解析,並按順序投放到消費隊列。開啟該配置時通常需要增加Task Manager CPU進行配合。
最佳化Debezium參數
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size:阻塞隊列可以容納的記錄的最大數量。當Debezium從資料庫讀取事件流時,它會在將事件寫入下遊之前將它們放入阻塞隊列。預設值為8192。debezium.max.batch.size:該連接器每次迭代處理的事件條數最大值。預設值為2048。debezium.poll.interval.ms:連接器應該在請求新的變更事件前等待多少毫秒。預設值為1000毫秒,即1秒。
使用樣本:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium配置
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 開啟並行解析和解析過濾
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 僅對指定表的變更事件進行解析。
'scan.parallel-deserialize-changelog.enabled' = 'true' -- 使用多線程對Binlog進行解析。
...
)分析資料延遲,最佳化作業吞吐
在增量階段出現資料延遲時,可以按照以下步驟進行分析:
參見概覽中的currentFetchEventTimeLag和currentEmitEventTimeLag兩個指標,currentFetchEventTimeLag代表從Binlog讀取到資料的延遲,currentEmitEventTimeLag代表從Binlog讀取到作業相關的表的資料的延遲。
情境
詳情
currentFetchEventTimeLag延遲較小而currentEmitEventTimeLag延遲較大,並且currentEmitEventTimeLag幾乎不更新。
currentFetchEventTimeLag延遲較小說明從資料庫拉取Binlog的延遲較低,但是Binlog中屬於作業需要讀取的表的資料較少,因此currentEmitEventTimeLag幾乎不更新,屬於正常現象。
currentFetchEventTimeLag延遲和currentEmitEventTimeLag延遲都比較大。
說明Source表拉取能力較弱,可以參見本小節的後續步驟進行調優。
反壓的存在會導致Source端資料發送至下遊運算元的速率下降,您可能會觀察到sourceIdleTime周期性上升,currentFetchEventTimeLag和currentEmitEventTimeLag不斷增長。可以通過增大反壓源頭所在節點的並發度來避免該情況。
參見CPU中的TM CPU Usage指標和JVM中的TM GC Time指標,確認是否出現CPU或者記憶體資源不足的情況,可以適當增加作業資源以最佳化讀取效能,還可以開啟mini-batch參數以提升輸送量,參見高效能Flink SQL最佳化技巧。
在作業中存在SinkUpsertMaterializer運算元並且存在大狀態時,會影響讀取效能,請考慮增加作業並發度或者避免使用SinkUpsertMaterializer運算元,詳情請參見避免使用SinkUpsertMaterializer。對已有作業配置去掉SinkUpsertMaterializer運算元時,需要無狀態啟動。原因是作業拓撲發生改變,從原有作業狀態可能無法啟動或者遺失資料。
開啟讀取 RDS Binlog,避免Binlog到期
使用阿里雲RDS MySQL執行個體作為Source資料來源時,支援讀取儲存在OSS的記錄備份。當指定的時間戳記或者Binlog位點對應的檔案儲存在OSS時,會自動拉取OSS記錄檔到Flink叢集本地進行讀取,當指定的時間戳記或者Binlog位點對應的檔案儲存在資料庫本地時,會自動切換到使用資料庫連接進行讀取。該功能僅在Realtime ComputeFlink版本提供,社區版MySQL CDC連接器不支援。
開啟讀取OSS記錄備份功能需要配置RDS的串連參數,使用樣本:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'rds.region-id' = 'cn-beijing',
'rds.access-key-id' = 'your_access_key_id',
'rds.access-key-secret' = 'your_access_key_secret',
'rds.db-instance-id' = 'rm-xxxxxxxx', // 資料庫執行個體id。
'rds.main-db-id' = '12345678', // 主庫編號。
'rds.endpoint' = 'rds.aliyuncs.com'
...
)使用資料攝入進行整庫同步,表結構變更同步
對於只包含資料同步邏輯的作業,建議使用資料攝入運行,資料攝入作業基於Data Integration情境進行了深度最佳化,使用方式參見Flink CDC資料攝入作業快速入門以及Flink CDC資料攝入作業開發(公測中)。
如下代碼提供了將MySQL的app_db整庫同步到Hologres的樣本,對於上遊app_db庫中的表結構變更,資料攝入作業會將該變更同步到下遊資料庫:
source:
type: mysql
hostname: <hostname>
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: app_db.\.*
server-id: 5400-5404
sink:
type: hologres
name: Hologres Sink
endpoint: <endpoint>
dbname: <database-name>
username: ${secret_values.holousername}
password: ${secret_values.holopassword}
pipeline:
name: Sync MySQL Database to Hologres資料攝入連接器新增表功能
MySQL的資料攝入連接器針對兩種情境下的新增表,分別提供了配置項進行支援。
配置項 | 說明 | 備忘 |
| 從Checkpoint重啟時,是否同步上一次啟動時未匹配到的新增表,全增量同步處理新增表的資料。 | 僅支援在 |
| 在增量階段,是否同步匹配到的新增表的資料,自動同步新增表資料。 |
|
scan.newly-added-table.enabled和scan.binlog.newly-added-table.enabled不建議同時開啟,同時開啟會導致資料重複問題。