本教程介紹如何使用Canal將MySQL的資料同步至雲訊息佇列 Kafka 版。
背景資訊
Canal的主要用途是基於MySQL資料庫增量日誌解析,提供增量資料訂閱和消費。Canal偽裝自己為MySQL Slave,向MySQL Master發送dump請求。MySQL Master收到dump請求,開始推送Binary log給Canal,Canal解析Binary log來同步資料。Canal與雲訊息佇列 Kafka 版建立對接,您可以把MySQL更新的資料寫入到雲訊息佇列 Kafka 版中來分析。其詳細的工作原理,請參見Canal官網。
前提條件
在開始本教程前,請確保您已完成以下操作:
安裝MySQL,並進行相關初始化與設定。具體操作,請參見 Canal QuickStart。
在雲訊息佇列 Kafka 版控制台建立執行個體以及Topic資源。具體操作,請參見步驟三:建立資源。
操作步驟
下載Canal壓縮包,本教程以1.1.5版本為例。
執行以下命令,建立目錄檔案夾。本教程以/home/doc/tools/canal.deployer-1.1.5路徑為例。
mkdir -p /home/doc/tools/canal.deployer-1.1.5將Canal壓縮包複製到/home/doc/tools/canal.deployer-1.1.5路徑並解壓。
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5在/home/doc/tools/canal.deployer-1.1.5路徑,執行以下命令,編輯instance.properties檔案。
vi conf/example/instance.properties根據instance.properties參數列表配置參數。
# 根據實際情況修改為您的資料庫資訊。 ################################################# ... # 資料庫地址。 canal.instance.master.address=192.168.XX.XX:3306 # username/password為資料庫的使用者名稱和密碼。 ... canal.instance.dbUsername=**** canal.instance.dbPassword=**** ... # mq config # 您在雲訊息佇列 Kafka 版控制台建立的Topic。 canal.mq.topic=mysql_test # 針對資料庫名或者表名發送動態Topic。 #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* # 資料同步到雲訊息佇列 Kafka 版Topic的指定分區。 canal.mq.partition=0 # 以下兩個參數配置與canal.mq.partition互斥。配置以下兩個參數可以使資料發送至雲訊息佇列 Kafka 版Topic的不同分區。 #canal.mq.partitionsNum=3 #庫名.表名: 唯一主鍵,多個表之間用逗號分隔。 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################表 1. instance.properties參數列表 參數
是否必選
描述
canal.instance.master.address
是
MySQL資料庫的串連地址。
canal.instance.dbUsername
是
MySQL資料庫的使用者名稱。
canal.instance.dbPassword
是
MySQL資料庫的密碼。
canal.mq.topic
是
雲訊息佇列 Kafka 版執行個體的Topic。您可以在雲訊息佇列 Kafka 版控制台的Topic 管理頁面建立。具體操作,請參見步驟三:建立資源。
canal.mq.dynamicTopic
否
動態Topic規則運算式。設定Topic匹配規則運算式,可以將不同的資料表資料同步至不同的Topic。具體設定方法,請參見參數說明。
canal.mq.partition
否
資料庫資料同步到雲訊息佇列 Kafka 版Topic的指定分區。
canal.mq.partitionsNum
否
Topic的分區數量。該參數與canal.mq.partitionHash一起使用,可以將資料同步至雲訊息佇列 Kafka 版Topic不同的分區。
canal.mq.partitionHash
否
分區的規則運算式。具體設定方法,請參見參數說明。
執行以下命令,編輯canal.properties檔案。
vi conf/canal.properties根據canal.properties參數列表說明配置參數。
公網環境,訊息採用SASL_SSL協議進行鑒權並加密,通過SSL存取點訪問雲訊息佇列 Kafka 版。存取點的詳細資料,請參見存取點對比。
# ... # 您需設定為kafka。 canal.serverMode = kafka # ... # kafka配置。 #在雲訊息佇列 Kafka 版執行個體詳情頁面擷取的SSL存取點。 kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093 # 參數的預設設定如下所示,您可以根據實際情況調整。 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 # 公網環境,通過SASL_SSL鑒權並加密,您需配置網路通訊協定與身份校正機制。 kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks kafka.ssl.truststore.password= KafkaOnsClient kafka.security.protocol= SASL_SSL kafka.sasl.mechanism = PLAIN kafka.ssl.endpoint.identification.algorithm =表 2. canal.properties參數列表 參數
是否必選
描述
canal.serverMode
是
您需設定為kafka。
kafka.bootstrap.servers
是
雲訊息佇列 Kafka 版執行個體存取點。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。
kafka.ssl.truststore.location
是
SSL根憑證kafka.client.truststore.jks的存放路徑。
說明公網環境下,訊息必須進行鑒權與加密,才能確保傳輸的安全。即需通過SSL存取點採用SASL_SSL協議進行傳輸。具體資訊,請參見存取點對比。
kafka.acks
是
雲訊息佇列 Kafka 版接收到資料之後給用戶端發出的確認訊號。取值說明如下:
0:表示用戶端不需要等待任何確認收到的資訊。
1:表示等待Leader成功寫入而不等待所有備份是否成功寫入。
all:表示等待Leader成功寫入並且所有備份都成功寫入。
kafka.compression.type
是
壓縮資料的壓縮演算法,預設是無壓縮。取值如下:
none。
gzip。
snappy。
kafka.batch.size
是
用戶端資料區塊攢批的大小。單位:Byte。
該參數控制批量處理訊息的位元組數。用戶端發送到Brokers的請求將包含多個批量處理,以減少請求次數。較小的批量處理數值可能降低輸送量,而較大的批量處理數值將會浪費更多記憶體空間,分配一個指定的批量處理訊息緩衝區有助於提高用戶端和服務端的效能。
說明kafka.batch.size與kafka.linger.ms參數都是控制批量處理訊息的條件,滿足其中一個參數的條件,用戶端就攢批完成,訊息進入待發送狀態。
kafka.linger.ms
是
用戶端資料區塊攢批的最大時間長度。單位:ms。
用戶端設定攢批訊息的延遲時間,以批量處理訊息,減少請求次數。
kafka.max.request.size
是
用戶端每次請求的最大位元組數。
kafka.buffer.memory
是
快取資料的記憶體大小。
kafka.max.in.flight.requests.per.connection
是
限制用戶端在單個串連上能夠發送的未響應請求的個數。設定此值是1表示Broker在響應請求之前用戶端不能再向同一個Broker發送請求。
kafka.retries
是
訊息發送失敗時,是否重複發送。設定為0,表示不會重複發送;設定大於0的值,用戶端重新發送資料。
kafka.ssl.truststore.password
是
SSL根憑證的密碼,設定為KafkaOnsClient。
kafka.security.protocol
是
採用SASL_SSL協議進行鑒權並加密,即設定為SASL_SSL。
kafka.sasl.mechanism
是
SASL身份認證的機制。SSL存取點採用PLAIN機制驗證身份。
公網環境,需通過SASL進行身份校正,需要在bin/startup.sh配置環境變數,並編輯kafka_client_producer_jaas.conf檔案,配置雲訊息佇列 Kafka 版執行個體的使用者名稱與密碼。
執行
vi bin/startup.sh命令,編輯startup.sh檔案,配置環境變數。JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"執行
vi conf/kafka_client_producer_jaas.conf命令,編輯kafka_client_producer_jaas.conf檔案,配置執行個體使用者名稱與密碼資訊。說明如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的執行個體詳情頁面擷取預設使用者的使用者名稱和密碼。
如果執行個體已開啟ACL,請確保要使用的SASL使用者為PLAIN類型且已授權收發訊息的許可權。具體資訊,請參見SASL使用者授權。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="執行個體的使用者名稱" password="執行個體的使用者名稱密碼"; };
VPC環境,訊息採用PLAINTEXT協議不鑒權不加密傳輸,通過預設存取點訪問雲訊息佇列 Kafka 版,僅需配置canal.serverMode與kafka.bootstrap.servers參數。存取點的詳細資料,請參見存取點對比。
# ... # 您需設定為kafka。 canal.serverMode = kafka # ... # kafka配置。 # 在雲訊息佇列 Kafka 版執行個體詳情頁面擷取的預設存取點。 kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 # 以下參數請您可以按照實際情況調整,也可以保持預設設定。 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0
在/home/doc/tools/canal.deployer-1.1.5路徑,執行以下命令,啟動Canal。
sh bin/startup.sh查看/home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log記錄檔,確認Canal與雲訊息佇列 Kafka 版串連成功,Canal正在運行。
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111] 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......查看/home/doc/tools/canal.deployer-1.1.5/logs/example/example.log記錄檔,確認Canal Instance已啟動。
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
測實驗證
啟動Canal之後,進行資料同步驗證。
在MySQL資料庫,建立資料表T_Student。資料表資料樣本如下:
mysql> select * from T_Student; +--------+---------+------+------+ | stuNum | stuName | age | sex | +--------+---------+------+------+ | 1 | 小王 | 18 | girl | | 2 | 小張 | 17 | boy | +--------+---------+------+------+ 2 rows in set (0.00 sec)查看/home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log記錄檔,資料庫的每次增刪改操作,都會在meta.log中產生一條記錄,查看該日誌可以確認Canal是否有採集到資料。
tail -f example/meta.log 2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]登入雲訊息佇列 Kafka 版控制台,查詢訊息,確認MySQL的資料被同步在雲訊息佇列 Kafka 版。控制台查詢訊息的具體操作,請參見訊息查詢。

資料同步完畢,執行以下命令,關閉Canal。
sh bin/stop.sh