本文為您介紹如何將MySQL整庫同步Kafka,從而降低多個任務對MySQL資料庫造成的壓力。
背景資訊
MySQL CDC資料表主要用於擷取MySQL資料,並可以即時同步資料表中的修改,經常用在複雜的計算情境。例如,作為一張維表和其他資料表做Join操作。在使用中,同一張MySQL表可能被多個作業依賴,當多個任務使用同一張MySQL表做處理時,MySQL資料庫會啟動多個串連,對MySQL伺服器和網路造成很大的壓力。
方案架構
為緩解上遊MySQL資料庫的壓力,阿里雲FlinkRealtime Compute已提供將MySQL整庫同步至Kafka的能力。該方案通過引入Kafka作為中介層,並採用Flink CDC資料攝入作業同步至Kafka來實現。
在一個作業中,上遊MySQL的資料即時同步至Kafka,每張MySQL表以Upsert方式寫入相應的Kafka Topic,然後使用Upsert Kafka連接器讀取Topic中的資料替代訪問MySQL表,從而有效降低多個任務對MySQL資料庫造成的壓力。

使用限制
同步的MySQL表必須包含主鍵。
支援使用自建Kafka叢集、EMR的Kafka叢集、ApsaraMQ for Kafka。使用ApsaraMQ for Kafka時,只能通過預設存取點使用。
Kafka叢集的儲存空間必須大於源表資料的儲存空間,否則會因儲存空間不足導致資料丟失。因為整庫同步Kafka建立的topic都是compacted topic,即topic的每個訊息鍵(Key)僅保留最近的一條訊息,但是資料不會到期,compacted topic裡相當於儲存了與源庫的表相同大小的資料。
實踐情境
例如,在訂單評論即時分析情境下,假設有使用者表(user),訂單表(order)和使用者評論表(feedback)三張表。各個表包含資料如下圖所示。
在展示使用者訂單資訊和使用者評論時,需要通過關聯使用者表(user)來擷取使用者名稱(name欄位)資訊。SQL樣本如下。
-- 將訂單資訊和使用者表做join,展示每個訂單的使用者名稱和商品名。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;
-- 將評論和使用者表做join,展示每個評論的內容和對應使用者名稱。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;對於以上兩個SQL任務,user表在兩個作業中都被使用了一次。運行時,兩個作業都會讀取MySQL的全量資料和增量資料。全量讀取需要建立MySQL串連,增量讀取需要建立Binlog Client。隨著作業的不斷增多,MySQL串連和Binlog Client資源也會對應增長,會給上遊資料庫產生極大的壓力,為了緩解對上遊MySQL資料庫的壓力,通過CDAS或CTAS文法將上遊的MySQL資料即時同步到Kafka中,提供給多個下遊作業消費。
前提條件
已開通Realtime ComputeFlink版,詳情請參見開通Realtime ComputeFlink版。
已開通雲訊息佇列Kafka,詳情請參見部署訊息佇列Kafka執行個體。
已開通RDS MySQL,詳情請參見建立RDS MySQL執行個體。
Realtime ComputeFlink版、雲資料庫RDS MySQL、雲訊息佇列Kafka需要在同一VPC下。如果不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?和如何訪問公網?。
通過RAM使用者或RAM角色等身份訪問對應資源時,需要其具備對應資源的許可權。
準備工作
建立RDS MySQL執行個體並準備資料來源
建立RDS MySQL資料庫,詳情請參見建立資料庫。
為目標執行個體建立名稱為
order_dw的資料庫。準備MySQL CDC資料來源。
在目標執行個體詳情頁面,單擊上方的登入資料庫。
在彈出的DMS頁面中,填寫建立的資料庫帳號名和密碼,然後單擊登入。
登入成功後,在左側雙擊
order_dw資料庫,切換資料庫。在SQL Console地區編寫三張業務表的建表DDL以及插入的資料語句。
CREATE TABLE `user` ( id bigint not null primary key, name varchar(50) not null ); CREATE TABLE `order` ( id bigint not null primary key, product varchar(50) not null, user_id bigint not null ); CREATE TABLE `feedback` ( id bigint not null primary key, user_id bigint not null, comment varchar(50) not null ); -- 準備資料 INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry'); INSERT INTO `order` VALUES (1, 'Football', 2), (2, 'Basket', 1); INSERT INTO `feedback` VALUES (1, 1, 'Good.'), (2, 2, 'Very good');
單擊執行,單擊直接執行。
操作步驟
建立並啟動一個Flink CDC資料攝入任務,將上遊的MySQL資料即時同步到Kafka中,提供給多個下遊作業消費。整庫同步作業會自動建立topic,topic名稱支援通過route模組定義,topic分區數和副本數會使用Kafka叢集的預設配置,並且cleanup.policy會設定為compact。
預設Topic名稱
整庫同步任務建立的Kafka topic名稱格式預設是用逗號串連MySQL資料庫名和表名,如下作業會建立三個topic:order_dw.user,order_dw.order和order_dw.feedback。
在頁面,建立Flink CDC資料攝入作業,並將如下代碼拷貝到YAML編輯器。
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # 阿里雲訊息佇列Kafka版需要配置如下參數 aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}單擊右上方的部署,進行作業部署。
單擊左側導覽列的,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動。
指定Topic名稱
整庫同步任務可以使用route指定每個表的topic名稱,如下作業會建立三個topic:user1,order2和feedback3。
在頁面,建立Flink CDC資料攝入作業,並將如下代碼拷貝到YAML編輯器。
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 route: - source-table: order_dw.user sink-table: user1 - source-table: order_dw.order sink-table: order2 - source-table: order_dw.feedback sink-table: feedback3 sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # 阿里雲訊息佇列Kafka版需要配置如下參數 aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}單擊右上方的部署,進行作業部署。
單擊左側導覽列的,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動。
大量設定Topic名稱
整庫同步任務可以使用route批量指定產生的topic名稱模式,如下作業會建立三個topic:topic_user,topic_order和topic_feedback。
在頁面,建立Flink CDC資料攝入作業,並將如下代碼拷貝到YAML編輯器。
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 route: - source-table: order_dw.\.* sink-table: topic_<> replace-symbol: <> sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # 阿里雲訊息佇列Kafka版需要配置如下參數 aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}單擊右上方的部署,進行作業部署。
單擊左側導覽列的,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動。
即時消費Kafka資料。
上遊MySQL資料庫中的資料會以JSON格式寫入Kafka中,一個Kafka Topic可以提供給多個下遊作業消費,下遊作業消費Topic中的資料來擷取資料庫表的最新資料。對於同步到Kafka的表,消費方式有以下兩種:
通過Catalog直接消費
作為源表,從Kafka Topic中讀取資料。
在頁面,建立SQL流作業,並將如下代碼拷貝到SQL編輯器。
CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; --寫入多個Sink時,必填。 -- 將訂單資訊和Kafka JSON Catalog中的使用者表做join,展示每個訂單的使用者名稱和商品名。 INSERT INTO print_user_proudct SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `order` --指定group和啟動模式 LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --指定group和啟動模式 ON `order`.value_user_id = `user`.key_id; -- 將評論和使用者表做join,展示每個評論的內容和對應使用者名稱。 INSERT INTO print_user_feedback SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as feedback --指定group和啟動模式 LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --指定group和啟動模式 ON feedback.value_user_id = `user`.key_id; END; --寫入多個Sink時,必填。本樣本通過Print連接器直接列印結果,您也可以輸出到連接器的結果表中進一步分析計算。寫入多個SINK文法,詳情請參見INSERT INTO語句。
說明在直接使用時,由於可能發生了Schema變更,Kafka JSON Catalog解析出的Schema可能與MySQL對應表存在差異,例如出現已經刪除的欄位,部分欄位可能出現為null的情況。
Catalog讀取出的Schema由消費到的資料的欄位組成。如果存在刪除的欄位且訊息未到期,則會出現一些已經不存在的欄位,這樣的欄位值會為null,該情況無需特殊處理。
單擊右上方的部署,進行作業部署。
單擊左側導覽列的,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動。
通過建立暫存資料表的方式消費
自訂Schema,從暫存資料表中讀取資料。
在頁面,建立SQL流作業,並將如下代碼拷貝到SQL編輯器。
CREATE TEMPORARY TABLE user_source ( key_id BIGINT, value_name STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE order_source ( key_id BIGINT, value_product STRING, value_user_id BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'order', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE feedback_source ( key_id BIGINT, value_user_id BIGINT, value_comment STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'feedback', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; --寫入多個Sink時,必填。 -- 將訂單資訊和Kafka JSON Catalog中的使用者表做join,展示每個訂單的使用者名稱和商品名。 INSERT INTO print_user_proudct SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name FROM order_source LEFT JOIN user_source ON order_source.value_user_id = user_source.key_id; -- 將評論和使用者表做join,展示每個評論的內容和對應使用者名稱。 INSERT INTO print_user_feedback SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name FROM feedback_source LEFT JOIN user_source ON feedback_source.value_user_id = user_source.key_id; END; --寫入多個Sink時,必填。本樣本通過Print連接器直接列印結果,您也可以輸出到連接器的結果表中進一步分析計算。寫入多個SINK文法,詳情請參見INSERT INTO語句。
暫存資料表配置參數見下表:
參數
說明
備忘
connector
Connector類型。
固定值為kafka。
topic
對應的Topic名稱。
和Kafka JSON Catalog的描述保持一致。
properties.bootstrap.servers
Kafka Broker地址。
格式為
host:port,host:port,host:port,以英文逗號(,)分割。scan.startup.mode
Kafka讀取資料的啟動位點。
取值如下:
earliest-offset:從Kafka最早分區開始讀取。
latest-offset:從Kafka最新位點開始讀取。
group-offsets(預設值):從指定的properties.group.id已提交的位點開始讀取。
timestamp:從scan.startup.timestamp-millis指定的時間戳記開始讀取。
specific-offsets:從scan.startup.specific-offsets指定的位移量開始讀取。
說明
該參數在作業無狀態啟動時生效。作業在從checkpoint重啟或狀態恢複時,會優先使用狀態中儲存的進度恢複讀取。
key.format
Flink Kafka Connector在序列化或還原序列化Kafka的訊息鍵(Key)時使用的格式。
固定值為json。
key.fields
Kafka訊息key部分對應的源表或結果表欄位。
多個欄位名以分號(;)分隔。例如
field1;field2。key.fields-prefix
為所有Kafka訊息鍵(Key)指定自訂首碼,以避免與訊息體(Value)或Metadata欄位重名。
需要和Kafka JSON Catalog的key.fields-prefix參數值保持一致。
value.format
Flink Kafka Connector在序列化或還原序列化Kafka的訊息體(Value)時使用的格式。
固定值為json。
value.fields-prefix
為所有Kafka訊息體(Value)指定自訂首碼,以避免與訊息鍵(Key)或Metadata欄位重名。
需要和Kafka JSON Catalog的value.fields-prefix參數值保持一致。
value.fields-include
定義訊息體在處理訊息鍵欄位時的策略。
固定值為EXCEPT_KEY。表示訊息體中不包含訊息鍵的欄位。
value.json.infer-schema.flatten-nested-columns.enable
Kafka訊息體(Value)是否遞迴式地展開JSON中的嵌套列。
對應Catalog的infer-schema.flatten-nested-columns.enable參數配置值。
value.json.infer-schema.primitive-as-string
Kafka訊息體(Value)是否推導所有基本類型為String類型。
對應Catalog的infer-schema.primitive-as-string參數配置值。
單擊右上方的部署,進行作業部署。
單擊左側導覽列的,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動。
查看作業結果。
單擊左側導覽列的,單擊目標作業。
在作業日誌頁簽,單擊運行Task Managers頁簽下的Path, ID的任務。
單擊日誌,在頁面搜尋
PrintSinkOutputWriter相關的日誌資訊。