全部產品
Search
文件中心

Realtime Compute for Apache Flink:Flink CDC MySQL整庫同步Kafka

更新時間:Sep 19, 2025

本文為您介紹如何將MySQL整庫同步Kafka,從而降低多個任務對MySQL資料庫造成的壓力。

背景資訊

MySQL CDC資料表主要用於擷取MySQL資料,並可以即時同步資料表中的修改,經常用在複雜的計算情境。例如,作為一張維表和其他資料表做Join操作。在使用中,同一張MySQL表可能被多個作業依賴,當多個任務使用同一張MySQL表做處理時,MySQL資料庫會啟動多個串連,對MySQL伺服器和網路造成很大的壓力。

方案架構

為緩解上遊MySQL資料庫的壓力,阿里雲FlinkRealtime Compute已提供將MySQL整庫同步至Kafka的能力。該方案通過引入Kafka作為中介層,並採用Flink CDC資料攝入作業同步至Kafka來實現。

在一個作業中,上遊MySQL的資料即時同步至Kafka,每張MySQL表以Upsert方式寫入相應的Kafka Topic,然後使用Upsert Kafka連接器讀取Topic中的資料替代訪問MySQL表,從而有效降低多個任務對MySQL資料庫造成的壓力。

圖片 1

使用限制

  • 同步的MySQL表必須包含主鍵。

  • 支援使用自建Kafka叢集、EMR的Kafka叢集、ApsaraMQ for Kafka。使用ApsaraMQ for Kafka時,只能通過預設存取點使用。

  • Kafka叢集的儲存空間必須大於源表資料的儲存空間,否則會因儲存空間不足導致資料丟失。因為整庫同步Kafka建立的topic都是compacted topic,即topic的每個訊息鍵(Key)僅保留最近的一條訊息,但是資料不會到期,compacted topic裡相當於儲存了與源庫的表相同大小的資料。

實踐情境

例如,在訂單評論即時分析情境下,假設有使用者表(user),訂單表(order)和使用者評論表(feedback)三張表。各個表包含資料如下圖所示。mysql database

在展示使用者訂單資訊和使用者評論時,需要通過關聯使用者表(user)來擷取使用者名稱(name欄位)資訊。SQL樣本如下。

-- 將訂單資訊和使用者表做join,展示每個訂單的使用者名稱和商品名。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- 將評論和使用者表做join,展示每個評論的內容和對應使用者名稱。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

對於以上兩個SQL任務,user表在兩個作業中都被使用了一次。運行時,兩個作業都會讀取MySQL的全量資料和增量資料。全量讀取需要建立MySQL串連,增量讀取需要建立Binlog Client。隨著作業的不斷增多,MySQL串連和Binlog Client資源也會對應增長,會給上遊資料庫產生極大的壓力,為了緩解對上遊MySQL資料庫的壓力,通過CDAS或CTAS文法將上遊的MySQL資料即時同步到Kafka中,提供給多個下遊作業消費。

前提條件

準備工作

建立RDS MySQL執行個體並準備資料來源

  1. 建立RDS MySQL資料庫,詳情請參見建立資料庫

    為目標執行個體建立名稱為order_dw的資料庫。

  2. 準備MySQL CDC資料來源。

    1. 在目標執行個體詳情頁面,單擊上方的登入資料庫

    2. 在彈出的DMS頁面中,填寫建立的資料庫帳號名和密碼,然後單擊登入

    3. 登入成功後,在左側雙擊order_dw資料庫,切換資料庫。

    4. 在SQL Console地區編寫三張業務表的建表DDL以及插入的資料語句。

      CREATE TABLE `user` (
        id bigint not null primary key,
        name varchar(50) not null
      );
      
      CREATE TABLE `order` (
        id bigint not null primary key,
        product varchar(50) not null,
        user_id bigint not null
      );
      
      CREATE TABLE `feedback` (
        id bigint not null primary key,
        user_id bigint not null,
        comment varchar(50) not null
      );
      
      -- 準備資料
      INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry');
      
      INSERT INTO `order` VALUES
      (1, 'Football', 2),
      (2, 'Basket', 1);
      
      INSERT INTO `feedback` VALUES
      (1, 1, 'Good.'),
      (2, 2, 'Very good');
  3. 單擊執行,單擊直接執行

操作步驟

  1. 建立並啟動一個Flink CDC資料攝入任務,將上遊的MySQL資料即時同步到Kafka中,提供給多個下遊作業消費。整庫同步作業會自動建立topic,topic名稱支援通過route模組定義,topic分區數和副本數會使用Kafka叢集的預設配置,並且cleanup.policy會設定為compact。

    預設Topic名稱

    整庫同步任務建立的Kafka topic名稱格式預設是用逗號串連MySQL資料庫名和表名,如下作業會建立三個topic:order_dw.user,order_dw.order和order_dw.feedback。

    1. 資料開發 > 資料攝入頁面,建立Flink CDC資料攝入作業,並將如下代碼拷貝到YAML編輯器。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # 阿里雲訊息佇列Kafka版需要配置如下參數
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 單擊右上方的部署,進行作業部署。

    3. 單擊左側導覽列的營運中心 > 作業營運,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動

    指定Topic名稱

    整庫同步任務可以使用route指定每個表的topic名稱,如下作業會建立三個topic:user1,order2和feedback3。

    1. 資料開發 > 資料攝入頁面,建立Flink CDC資料攝入作業,並將如下代碼拷貝到YAML編輯器。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        
      route:
        - source-table: order_dw.user
          sink-table: user1
        - source-table: order_dw.order
          sink-table: order2
        - source-table: order_dw.feedback
          sink-table: feedback3
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # 阿里雲訊息佇列Kafka版需要配置如下參數
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 單擊右上方的部署,進行作業部署。

    3. 單擊左側導覽列的營運中心 > 作業營運,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動

    大量設定Topic名稱

    整庫同步任務可以使用route批量指定產生的topic名稱模式,如下作業會建立三個topic:topic_user,topic_order和topic_feedback。

    1. 資料開發 > 資料攝入頁面,建立Flink CDC資料攝入作業,並將如下代碼拷貝到YAML編輯器。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        
      route:
        - source-table: order_dw.\.*
          sink-table: topic_<>
          replace-symbol: <>
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # 阿里雲訊息佇列Kafka版需要配置如下參數
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 單擊右上方的部署,進行作業部署。

    3. 單擊左側導覽列的營運中心 > 作業營運,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動

  1. 即時消費Kafka資料。

    上遊MySQL資料庫中的資料會以JSON格式寫入Kafka中,一個Kafka Topic可以提供給多個下遊作業消費,下遊作業消費Topic中的資料來擷取資料庫表的最新資料。對於同步到Kafka的表,消費方式有以下兩種:

    通過Catalog直接消費

    作為源表,從Kafka Topic中讀取資料。

    1. 資料開發 > ETL頁面,建立SQL流作業,並將如下代碼拷貝到SQL編輯器。

      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      --寫入多個Sink時,必填。
      
      -- 將訂單資訊和Kafka JSON Catalog中的使用者表做join,展示每個訂單的使用者名稱和商品名。
      INSERT INTO print_user_proudct
      SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as `order` --指定group和啟動模式
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --指定group和啟動模式
      ON `order`.value_user_id = `user`.key_id;
      
      -- 將評論和使用者表做join,展示每個評論的內容和對應使用者名稱。
      INSERT INTO print_user_feedback
      SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as feedback  --指定group和啟動模式
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --指定group和啟動模式
      ON feedback.value_user_id = `user`.key_id;
      
      END;      --寫入多個Sink時,必填。

      本樣本通過Print連接器直接列印結果,您也可以輸出到連接器的結果表中進一步分析計算。寫入多個SINK文法,詳情請參見INSERT INTO語句

      說明

      在直接使用時,由於可能發生了Schema變更,Kafka JSON Catalog解析出的Schema可能與MySQL對應表存在差異,例如出現已經刪除的欄位,部分欄位可能出現為null的情況。

      Catalog讀取出的Schema由消費到的資料的欄位組成。如果存在刪除的欄位且訊息未到期,則會出現一些已經不存在的欄位,這樣的欄位值會為null,該情況無需特殊處理。

    2. 單擊右上方的部署,進行作業部署。

    3. 單擊左側導覽列的營運中心 > 作業營運,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動

    通過建立暫存資料表的方式消費

    自訂Schema,從暫存資料表中讀取資料。

    1. 資料開發 > ETL頁面,建立SQL流作業,並將如下代碼拷貝到SQL編輯器。

      CREATE TEMPORARY TABLE user_source (
        key_id BIGINT,
        value_name STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE order_source (
        key_id  BIGINT,
        value_product STRING,
        value_user_id BIGINT  
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE feedback_source (
        key_id  BIGINT,
        value_user_id BIGINT,
        value_comment STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'feedback',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      --寫入多個Sink時,必填。
      -- 將訂單資訊和Kafka JSON Catalog中的使用者表做join,展示每個訂單的使用者名稱和商品名。
      INSERT INTO print_user_proudct
      SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name
      FROM order_source LEFT JOIN user_source
      ON order_source.value_user_id = user_source.key_id;
      
      
      -- 將評論和使用者表做join,展示每個評論的內容和對應使用者名稱。
      INSERT INTO print_user_feedback
      SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name
      FROM feedback_source  LEFT JOIN user_source
      ON feedback_source.value_user_id = user_source.key_id;
      
      END;      --寫入多個Sink時,必填。

      本樣本通過Print連接器直接列印結果,您也可以輸出到連接器的結果表中進一步分析計算。寫入多個SINK文法,詳情請參見INSERT INTO語句

      暫存資料表配置參數見下表:

      參數

      說明

      備忘

      connector

      Connector類型。

      固定值為kafka。

      topic

      對應的Topic名稱。

      和Kafka JSON Catalog的描述保持一致。

      properties.bootstrap.servers

      Kafka Broker地址。

      格式為host:port,host:port,host:port,以英文逗號(,)分割。

      scan.startup.mode

      Kafka讀取資料的啟動位點。

      取值如下:

      • earliest-offset:從Kafka最早分區開始讀取。

      • latest-offset:從Kafka最新位點開始讀取。

      • group-offsets(預設值):從指定的properties.group.id已提交的位點開始讀取。

      • timestamp:從scan.startup.timestamp-millis指定的時間戳記開始讀取。

      • specific-offsets:從scan.startup.specific-offsets指定的位移量開始讀取。

      說明

      該參數在作業無狀態啟動時生效。作業在從checkpoint重啟或狀態恢複時,會優先使用狀態中儲存的進度恢複讀取。

      key.format

      Flink Kafka Connector在序列化或還原序列化Kafka的訊息鍵(Key)時使用的格式。

      固定值為json。

      key.fields

      Kafka訊息key部分對應的源表或結果表欄位。

      多個欄位名以分號(;)分隔。例如field1;field2

      key.fields-prefix

      為所有Kafka訊息鍵(Key)指定自訂首碼,以避免與訊息體(Value)或Metadata欄位重名。

      需要和Kafka JSON Catalog的key.fields-prefix參數值保持一致。

      value.format

      Flink Kafka Connector在序列化或還原序列化Kafka的訊息體(Value)時使用的格式。

      固定值為json。

      value.fields-prefix

      為所有Kafka訊息體(Value)指定自訂首碼,以避免與訊息鍵(Key)或Metadata欄位重名。

      需要和Kafka JSON Catalog的value.fields-prefix參數值保持一致。

      value.fields-include

      定義訊息體在處理訊息鍵欄位時的策略。

      固定值為EXCEPT_KEY。表示訊息體中不包含訊息鍵的欄位。

      value.json.infer-schema.flatten-nested-columns.enable

      Kafka訊息體(Value)是否遞迴式地展開JSON中的嵌套列。

      對應Catalog的infer-schema.flatten-nested-columns.enable參數配置值。

      value.json.infer-schema.primitive-as-string

      Kafka訊息體(Value)是否推導所有基本類型為String類型。

      對應Catalog的infer-schema.primitive-as-string參數配置值。

    2. 單擊右上方的部署,進行作業部署。

    3. 單擊左側導覽列的營運中心 > 作業營運,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動

  2. 查看作業結果。

    1. 單擊左側導覽列的營運中心 > 作業營運,單擊目標作業。

    2. 作業日誌頁簽,單擊運行Task Managers頁簽下的Path, ID的任務。

    3. 單擊日誌,在頁面搜尋PrintSinkOutputWriter相關的日誌資訊。

      1.png

相關文檔