全部產品
Search
文件中心

Realtime Compute for Apache Flink:MySQL連接器最佳實務

更新時間:Jan 27, 2026

本文為您介紹MySQL連接器在常見使用情境下的最佳實務。

設定Server ID,避免Binlog消費衝突

每個同步資料庫資料的用戶端,都會有一個唯一ID,即Server ID。如果不同作業使用了相同的Server ID,會因為衝突導致作業報錯。建議為每個MySQL CDC資料來源配置不同的Server ID。

  • Server ID配置方式

    Server ID可以在Flink建表語句中指定,也可以通過動態Hints配置。

    建議通過動態Hints來配置Server ID,而不是在建表的WITH參數中配置Server ID。動態Hints詳情請參見動態Hints

  • 不同情境下Server ID的配置

    • 未開啟增量快照框架或並行度為1

      當未開啟增量快照框架或並行度為1時,可以指定一個特定的Server ID。

      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • 開啟增量快照框架且並行度大於1

      當開啟增量快照框架且並行度大於1時,需要指定Server ID範圍,確保範圍內可用的Server ID數量不小於並行度。假設並行度為3,可以如下配置:

      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • 結合CTAS進行資料同步

      當結合CTAS進行資料同步時,如果CDC資料來源配置相同,會自動對資料來源進行合并複用,此時可以為多個CDC資料來源配置相同的Server ID。詳情請參見程式碼範例四:多CTAS語句

    • 同一作業包含多個MySQL CDC源表(非CTAS)

      當作業中包含多個MySQL CDC源表,且不是使用CTAS語句同步時,如果沒有開啟Source複用(詳情請參見開啟Source複用,減少Binlog資料連線),需要為每一個CDC源表提供不同的Server ID。同理,如果開啟增量快照框架且並行度大於1,需要指定Server ID範圍。

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;

設定分區參數,最佳化記憶體空間

MySQL CDC源表在啟動時掃描全表,將表按照主鍵分成多個分區(chunk),記錄下此時的Binlog位點。並使用增量快照演算法通過Select語句,逐個讀取每個分區的資料。作業會周期性執行Checkpoint,記錄下已經完成的分區。當發生Failover時,只需要繼續讀取未完成的分區。當分區全部讀取完後,會從之前擷取的Binlog位點讀取增量的變更記錄。Flink作業會繼續周期性執行Checkpoint,記錄下Binlog位點,當作業發生Failover,便會從之前記錄的Binlog位點繼續處理,從而實現Exactly Once語義。

更詳細的增量快照演算法,請參見MySQL CDC Connector

對於只有一個欄位的主鍵表,預設使用該欄位進行分區。對於有聯合主鍵的MySQL物理表,預設使用主鍵裡的第一個欄位進行分區。Flink計算引擎VVR 6.0.7及以上版本支援讀取無主鍵源表,需要設定scan.incremental.snapshot.chunk.key-column指定一個非空類型的欄位進行分區。

分區參數最佳化

分區資料和分區資訊會儲存在記憶體中,在一些情況下,可能會出現OOM的問題。可以根據出現OOM的組件進行參數調整:

  • JobManager

    JobManager儲存所有分區的資訊,如果分區數量過多會出現OOM,需要通過增加scan.incremental.snapshot.chunk.size值來減少分區數。也可以在運行參數配置中設定jobmanager.memory.heap.size以增大JobManager堆記憶體,參見Flink參數配置

  • TaskManager

    • TaskManager讀取每個分區的資料,如果分區裡資料條數過多會出現OOM,需要通過減少scan.incremental.snapshot.chunk.size值來減少分區裡的資料條數。也可以在運行參數配置中調整Task Manager Memory為更大值以增加TaskManager堆記憶體。

    • 在VVR 8.0.8及之前版本,最後一個分區需要讀取的資料量可能比較大,導致TaskManager出現OOM,建議升級到VVR 8.0.9及以上以避免該問題。

    • 對於有聯合主鍵的MySQL CDC源表,會使用主鍵裡的第一個欄位進行分區,如果存在大量的資料在該欄位中為相同欄位值的情況,對應分區的資料會更多,可能會導致TaskManager出現OOM,可以設定scan.incremental.snapshot.chunk.key-column指定主鍵中的其他欄位進行分區劃分。

調節作業配置,加速全量階段讀取

MySQL源表全量階段需要通過JDBC連結讀取快照資料,可以通過以下方式加快全量階段讀取速度。

  1. 增加source並發度,加快全量階段讀取速度。

  2. 增加scan.incremental.snapshot.chunk.size值來增加單個分區擷取的資料量。

  3. 如果下遊結果表存在主鍵且支援等冪寫入,可以開啟scan.incremental.snapshot.backfill.skip跳過backfill部分的binlog讀取,加快全量階段處理速率。

開啟Source複用,減少Binlog資料連線

在作業中包含了多張MySQL源表時,開啟Source複用能夠複用Binlog串連,從而減少資料庫的壓力。該功能僅在Realtime ComputeFlink版本提供,社區版MySQL CDC連接器不支援。

您可以在SQL作業中使用SET命令開啟Source複用功能:

SET 'table.optimizer.source-merge.enabled' = 'true';

建議在新建立的作業中就開啟Source複用功能。對已有作業啟用Source複用後,需要無狀態啟動。原因是Source複用會導致作業拓撲改變,從原有作業狀態可能無法啟動或者遺失資料。

開啟Source複用後,具有相同配置參數的MySQL源表會進行合并。如果您的作業中所有源表的配置都相同,作業的Binlog串連數可以按照如下方式計算:

  • 全量讀取階段,Binlog串連數等於Source並發度。

  • 增量讀取階段,Binlog串連數等於1。

重要
  • VVR 8.0.8及8.0.9版本,在開啟CDC Source複用時,還需要額外設定SET 'sql-gateway.exec-plan.enabled' = 'false';

  • 在開啟CDC Source複用後,不建議將作業配置項pipeline.operator-chaining設為false,因為將運算元鏈斷開後,Source發送給下遊運算元的資料會增加序列化和反序列的開銷,當合并的Source越多時,開銷會越大。

  • 在Realtime Compute引擎VVR 8.0.7版本,將pipeline.operator-chaining設為false時會出現序列化的問題。

開啟Binlog解析參數,加速增量資料讀取

MySQL連接器作為源表或資料攝入資料來源使用時,在增量階段會解析Binlog檔案產生各種變更訊息,Binlog檔案使用二進位記錄著所有表的變更,可以通過以下方式加速Binlog檔案解析。

  • 開啟並行解析和解析過濾配置(該功能僅在Realtime ComputeFlink版本提供且僅Flink計算引擎VVR 8.0.7及以上版本支援,社區版MySQL CDC連接器不支援)

    • 開啟配置項scan.only.deserialize.captured.tables.changelog.enabled:僅對指定表的變更事件進行解析。

    • 開啟配置項scan.parallel-deserialize-changelog.enabled:採用多線程對Binlog檔案進行解析,並按順序投放到消費隊列。開啟該配置時通常需要增加Task Manager CPU進行配合。

  • 最佳化Debezium參數

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size:阻塞隊列可以容納的記錄的最大數量。當Debezium從資料庫讀取事件流時,它會在將事件寫入下遊之前將它們放入阻塞隊列。預設值為8192。

    • debezium.max.batch.size:該連接器每次迭代處理的事件條數最大值。預設值為2048。

    • debezium.poll.interval.ms:連接器應該在請求新的變更事件前等待多少毫秒。預設值為1000毫秒,即1秒。

使用樣本:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium配置
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- 開啟並行解析和解析過濾
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- 僅對指定表的變更事件進行解析。
    'scan.parallel-deserialize-changelog.enabled' = 'true'  -- 使用多線程對Binlog進行解析。
    ...
)

分析資料延遲,最佳化作業吞吐

在增量階段出現資料延遲時,可以按照以下步驟進行分析:

  1. 參見概覽中的currentFetchEventTimeLag和currentEmitEventTimeLag兩個指標,currentFetchEventTimeLag代表從Binlog讀取到資料的延遲,currentEmitEventTimeLag代表從Binlog讀取到作業相關的表的資料的延遲。

    情境

    詳情

    currentFetchEventTimeLag延遲較小而currentEmitEventTimeLag延遲較大,並且currentEmitEventTimeLag幾乎不更新。

    currentFetchEventTimeLag延遲較小說明從資料庫拉取Binlog的延遲較低,但是Binlog中屬於作業需要讀取的表的資料較少,因此currentEmitEventTimeLag幾乎不更新,屬於正常現象。

    currentFetchEventTimeLag延遲和currentEmitEventTimeLag延遲都比較大。

    說明Source表拉取能力較弱,可以參見本小節的後續步驟進行調優。

  2. 反壓的存在會導致Source端資料發送至下遊運算元的速率下降,您可能會觀察到sourceIdleTime周期性上升,currentFetchEventTimeLag和currentEmitEventTimeLag不斷增長。可以通過增大反壓源頭所在節點的並發度來避免該情況。

  3. 參見CPU中的TM CPU Usage指標和JVM中的TM GC Time指標,確認是否出現CPU或者記憶體資源不足的情況,可以適當增加作業資源以最佳化讀取效能,還可以開啟mini-batch參數以提升輸送量,參見高效能Flink SQL最佳化技巧

  4. 在作業中存在SinkUpsertMaterializer運算元並且存在大狀態時,會影響讀取效能,請考慮增加作業並發度或者避免使用SinkUpsertMaterializer運算元,詳情請參見避免使用SinkUpsertMaterializer。對已有作業配置去掉SinkUpsertMaterializer運算元時,需要無狀態啟動。原因是作業拓撲發生改變,從原有作業狀態可能無法啟動或者遺失資料。

開啟讀取 RDS Binlog,避免Binlog到期

使用阿里雲RDS MySQL執行個體作為Source資料來源時,支援讀取儲存在OSS的記錄備份。當指定的時間戳記或者Binlog位點對應的檔案儲存在OSS時,會自動拉取OSS記錄檔到Flink叢集本地進行讀取,當指定的時間戳記或者Binlog位點對應的檔案儲存在資料庫本地時,會自動切換到使用資料庫連接進行讀取。該功能僅在Realtime ComputeFlink版本提供,社區版MySQL CDC連接器不支援。

開啟讀取OSS記錄備份功能需要配置RDS的串連參數,使用樣本:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    'rds.region-id' = 'cn-beijing',
    'rds.access-key-id' = 'your_access_key_id',
    'rds.access-key-secret' = 'your_access_key_secret',
    'rds.db-instance-id' = 'rm-xxxxxxxx',  // 資料庫執行個體id。
    'rds.main-db-id' = '12345678', // 主庫編號。
    'rds.endpoint' = 'rds.aliyuncs.com'
    ...
)

使用資料攝入進行整庫同步,表結構變更同步

對於只包含資料同步邏輯的作業,建議使用資料攝入運行,資料攝入作業基於Data Integration情境進行了深度最佳化,使用方式參見Flink CDC資料攝入作業快速入門以及Flink CDC資料攝入作業開發(公測中)

如下代碼提供了將MySQL的app_db整庫同步到Hologres的樣本,對於上遊app_db庫中的表結構變更,資料攝入作業會將該變更同步到下遊資料庫:

source:
  type: mysql
  hostname: <hostname>
  port: 3306
  username: ${secret_values.mysqlusername}
  password: ${secret_values.mysqlpassword}
  tables: app_db.\.*
  server-id: 5400-5404

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <endpoint>
  dbname: <database-name>
  username: ${secret_values.holousername}
  password: ${secret_values.holopassword}

pipeline:
  name: Sync MySQL Database to Hologres

資料攝入連接器新增表功能

MySQL的資料攝入連接器針對兩種情境下的新增表,分別提供了配置項進行支援。

配置項

說明

備忘

scan.newly-added-table.enabled

從Checkpoint重啟時,是否同步上一次啟動時未匹配到的新增表,全增量同步處理新增表的資料。

僅支援在scan.startup.mode配置項取值為initial模式下使用,其他啟動模式下該配置不生效。

scan.binlog.newly-added-table.enabled

在增量階段,是否同步匹配到的新增表的資料,自動同步新增表資料。

  • 建議在初次啟動作業時開啟,同步作業會自動解析Create Table DDL並同步資料到下遊。如果在資料庫表建立結束後,開啟該配置重啟作業,會導致資料不全問題。

  • 在initial啟動模式下,全量階段結束前所有的DDL操作都無法同步到下遊。在全量階段建立的表,開啟了scan.binlog.newly-added-table.enabled也無法完成自動同步。

重要

scan.newly-added-table.enabledscan.binlog.newly-added-table.enabled不建議同時開啟,同時開啟會導致資料重複問題。