全部產品
Search
文件中心

ApsaraMQ for Kafka:使用Canal將MySQL的資料同步至ApsaraMQ for Kafka

更新時間:Dec 27, 2024

本教程介紹如何使用Canal將MySQL的資料同步至雲訊息佇列 Kafka 版

背景資訊

Canal的主要用途是基於MySQL資料庫增量日誌解析,提供增量資料訂閱和消費。Canal偽裝自己為MySQL Slave,向MySQL Master發送dump請求。MySQL Master收到dump請求,開始推送Binary log給Canal,Canal解析Binary log來同步資料。Canal與雲訊息佇列 Kafka 版建立對接,您可以把MySQL更新的資料寫入到雲訊息佇列 Kafka 版中來分析。其詳細的工作原理,請參見Canal官網背景介紹

前提條件

在開始本教程前,請確保您已完成以下操作:

操作步驟

  1. 下載Canal壓縮包,本教程以1.1.5版本為例。

  2. 執行以下命令,建立目錄檔案夾。本教程以/home/doc/tools/canal.deployer-1.1.5路徑為例。

    mkdir -p /home/doc/tools/canal.deployer-1.1.5
  3. Canal壓縮包複製到/home/doc/tools/canal.deployer-1.1.5路徑並解壓。

    tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5
  4. /home/doc/tools/canal.deployer-1.1.5路徑,執行以下命令,編輯instance.properties檔案。

    vi conf/example/instance.properties

    根據instance.properties參數列表配置參數。

    #  根據實際情況修改為您的資料庫資訊。
    #################################################
    ...
    # 資料庫地址。
    canal.instance.master.address=192.168.XX.XX:3306
    # username/password為資料庫的使用者名稱和密碼。
    ...
    canal.instance.dbUsername=****
    canal.instance.dbPassword=****
    ...
    # mq config
    # 您在雲訊息佇列 Kafka 版控制台建立的Topic。
    canal.mq.topic=mysql_test
    # 針對資料庫名或者表名發送動態Topic。
    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
    # 資料同步到雲訊息佇列 Kafka 版Topic的指定分區。
    canal.mq.partition=0
    # 以下兩個參數配置與canal.mq.partition互斥。配置以下兩個參數可以使資料發送至雲訊息佇列 Kafka 版Topic的不同分區。
    #canal.mq.partitionsNum=3
    #庫名.表名: 唯一主鍵,多個表之間用逗號分隔。
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################
    表 1. instance.properties參數列表

    參數

    是否必選

    描述

    canal.instance.master.address

    MySQL資料庫的串連地址。

    canal.instance.dbUsername

    MySQL資料庫的使用者名稱。

    canal.instance.dbPassword

    MySQL資料庫的密碼。

    canal.mq.topic

    雲訊息佇列 Kafka 版執行個體的Topic。您可以在雲訊息佇列 Kafka 版控制台Topic 管理頁面建立。具體操作,請參見步驟三:建立資源

    canal.mq.dynamicTopic

    動態Topic規則運算式。設定Topic匹配規則運算式,可以將不同的資料表資料同步至不同的Topic。具體設定方法,請參見參數說明

    canal.mq.partition

    資料庫資料同步到雲訊息佇列 Kafka 版Topic的指定分區。

    canal.mq.partitionsNum

    Topic的分區數量。該參數與canal.mq.partitionHash一起使用,可以將資料同步至雲訊息佇列 Kafka 版Topic不同的分區。

    canal.mq.partitionHash

    分區的規則運算式。具體設定方法,請參見參數說明

  5. 執行以下命令,編輯canal.properties檔案。

    vi conf/canal.properties

    根據canal.properties參數列表說明配置參數。

    • 公網環境,訊息採用SASL_SSL協議進行鑒權並加密,通過SSL存取點訪問雲訊息佇列 Kafka 版。存取點的詳細資料,請參見存取點對比

      # ...
      # 您需設定為kafka。
      canal.serverMode = kafka
      # ...
      # kafka配置。
      #在雲訊息佇列 Kafka 版執行個體詳情頁面擷取的SSL存取點。
      kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
      # 參數的預設設定如下所示,您可以根據實際情況調整。
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
      
      # 公網環境,通過SASL_SSL鑒權並加密,您需配置網路通訊協定與身份校正機制。
      kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks
      kafka.ssl.truststore.password= KafkaOnsClient
      kafka.security.protocol= SASL_SSL
      kafka.sasl.mechanism = PLAIN
      kafka.ssl.endpoint.identification.algorithm =
      表 2. canal.properties參數列表

      參數

      是否必選

      描述

      canal.serverMode

      您需設定為kafka

      kafka.bootstrap.servers

      雲訊息佇列 Kafka 版執行個體存取點。您可在雲訊息佇列 Kafka 版控制台实例详情頁面的接入点信息地區擷取。

      kafka.ssl.truststore.location

      SSL根憑證kafka.client.truststore.jks的存放路徑。

      說明

      公網環境下,訊息必須進行鑒權與加密,才能確保傳輸的安全。即需通過SSL存取點採用SASL_SSL協議進行傳輸。具體資訊,請參見存取點對比

      kafka.acks

      雲訊息佇列 Kafka 版接收到資料之後給用戶端發出的確認訊號。取值說明如下:

      • 0:表示用戶端不需要等待任何確認收到的資訊。

      • 1:表示等待Leader成功寫入而不等待所有備份是否成功寫入。

      • all:表示等待Leader成功寫入並且所有備份都成功寫入。

      kafka.compression.type

      壓縮資料的壓縮演算法,預設是無壓縮。取值如下:

      • none

      • gzip

      • snappy

      kafka.batch.size

      用戶端資料區塊攢批的大小。單位:Byte。

      該參數控制批量處理訊息的位元組數。用戶端發送到Brokers的請求將包含多個批量處理,以減少請求次數。較小的批量處理數值可能降低輸送量,而較大的批量處理數值將會浪費更多記憶體空間,分配一個指定的批量處理訊息緩衝區有助於提高用戶端和服務端的效能。

      說明

      kafka.batch.sizekafka.linger.ms參數都是控制批量處理訊息的條件,滿足其中一個參數的條件,用戶端就攢批完成,訊息進入待發送狀態。

      kafka.linger.ms

      用戶端資料區塊攢批的最大時間長度。單位:ms。

      用戶端設定攢批訊息的延遲時間,以批量處理訊息,減少請求次數。

      kafka.max.request.size

      用戶端每次請求的最大位元組數。

      kafka.buffer.memory

      快取資料的記憶體大小。

      kafka.max.in.flight.requests.per.connection

      限制用戶端在單個串連上能夠發送的未響應請求的個數。設定此值是1表示Broker在響應請求之前用戶端不能再向同一個Broker發送請求。

      kafka.retries

      訊息發送失敗時,是否重複發送。設定為0,表示不會重複發送;設定大於0的值,用戶端重新發送資料。

      kafka.ssl.truststore.password

      SSL根憑證的密碼,設定為KafkaOnsClient

      kafka.security.protocol

      採用SASL_SSL協議進行鑒權並加密,即設定為SASL_SSL

      kafka.sasl.mechanism

      SASL身份認證的機制。SSL存取點採用PLAIN機制驗證身份。

      公網環境,需通過SASL進行身份校正,需要在bin/startup.sh配置環境變數,並編輯kafka_client_producer_jaas.conf檔案,配置雲訊息佇列 Kafka 版執行個體的使用者名稱與密碼。

      1. 執行vi bin/startup.sh命令,編輯startup.sh檔案,配置環境變數。

        JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"
      2. 執行vi conf/kafka_client_producer_jaas.conf命令,編輯kafka_client_producer_jaas.conf檔案,配置執行個體使用者名稱與密碼資訊。

        說明
        • 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的執行個體詳情頁面擷取預設使用者的使用者名稱和密碼。

        • 如果執行個體已開啟ACL,請確保要使用的SASL使用者為PLAIN類型且已授權收發訊息的許可權。具體資訊,請參見SASL使用者授權

        KafkaClient {  org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="執行個體的使用者名稱"  
                       password="執行個體的使用者名稱密碼";
        };
    • VPC環境,訊息採用PLAINTEXT協議不鑒權不加密傳輸,通過預設存取點訪問雲訊息佇列 Kafka 版,僅需配置canal.serverModekafka.bootstrap.servers參數。存取點的詳細資料,請參見存取點對比

      # ...
      # 您需設定為kafka。
      canal.serverMode = kafka
      # ...
      # kafka配置。
      # 在雲訊息佇列 Kafka 版執行個體詳情頁面擷取的預設存取點。
      kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
      # 以下參數請您可以按照實際情況調整,也可以保持預設設定。
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
  6. /home/doc/tools/canal.deployer-1.1.5路徑,執行以下命令,啟動Canal。

    sh bin/startup.sh
    • 查看/home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log記錄檔,確認Canal與雲訊息佇列 Kafka 版串連成功,Canal正在運行。

      2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
      2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111]
      2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    • 查看/home/doc/tools/canal.deployer-1.1.5/logs/example/example.log記錄檔,確認Canal Instance已啟動。

      2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
      2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
      2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
      2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

測實驗證

啟動Canal之後,進行資料同步驗證。

  1. 在MySQL資料庫,建立資料表T_Student。資料表資料樣本如下:

    mysql> select * from T_Student;
    +--------+---------+------+------+
    | stuNum | stuName | age  | sex  |
    +--------+---------+------+------+
    |      1 | 小王    |   18 | girl |
    |      2 | 小張    |   17 | boy  |
    +--------+---------+------+------+
    2 rows in set (0.00 sec)

    查看/home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log記錄檔,資料庫的每次增刪改操作,都會在meta.log中產生一條記錄,查看該日誌可以確認Canal是否有採集到資料。

    tail -f example/meta.log
    2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]
  2. 登入雲訊息佇列 Kafka 版控制台,查詢訊息,確認MySQL的資料被同步在雲訊息佇列 Kafka 版。控制台查詢訊息的具體操作,請參見訊息查詢

    查詢訊息

  3. 資料同步完畢,執行以下命令,關閉Canal。

    sh bin/stop.sh