全部產品
Search
文件中心

ApsaraMQ for Kafka:使用Kafka Connect將MySQL資料同步至雲訊息佇列 Kafka 版

更新時間:Sep 02, 2025

本教程介紹如何使用Kafka Connect的Source Connector將MySQL的資料同步至雲訊息佇列 Kafka 版

背景資訊

Kafka Connect主要用於將資料流輸入和輸出雲訊息佇列 Kafka 版。Kafka Connect主要通過各種Source Connector的實現,將資料從第三方系統輸入到Kafka Broker,通過各種Sink Connector實現,將資料從Kafka Broker中匯入到第三方系統。system

前提條件

在開始本教程前,請確保您已完成以下操作:

  • 下載MySQL Source Connector。

    說明

    本教程以0.5.2版本的MySQL Source Connector為例。

  • 下載Kafka Connect。

    說明

    本教程以0.10.2.2版本的Kafka Connect為例。

  • 安裝Docker。

步驟一:配置Kafka Connect

  1. 將下載完成的MySQL Connector解壓到指定目錄。

  2. 在Kafka Connect的設定檔connect-distributed.properties中配置外掛程式安裝位置。

    plugin.path=/kafka/connect/plugins
    重要

    Kafka Connect的早期版本不支援配置plugin.path,您需要在CLASSPATH中指定外掛程式位置。

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

步驟二:啟動Kafka Connect

在配置好connect-distributed.properties後,執行以下命令啟動Kafka Connect。

  • 公網接入

    1. 執行命令export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"設定java.security.auth.login.config

    2. 執行命令bin/connect-distributed.sh config/connect-distributed.properties啟動Kafka Connect。

  • VPC接入

    執行命令bin/connect-distributed.sh config/connect-distributed.properties啟動Kafka Connect。

步驟三:安裝MySQL

  1. 下載docker-compose-mysql.yaml

  2. 執行以下命令安裝MySQL。

    export DEBEZIUM_VERSION=0.5
    docker-compose -f docker-compose-mysql.yaml up

步驟四:配置MySQL

  1. 在設定檔中配置以下內容,開啟MySQL的binlog寫入功能,並配置binlog模式為row。

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1 
  2. 執行以下命令設定MySQL的User許可權。

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    說明

    樣本中MySQL的User為debezium,密碼為dbz

步驟五:啟動MySQL Connector

  1. 下載register-mysql.json

  2. 編輯register-mysql.json

    • VPC接入

      ## 雲訊息佇列 Kafka 版存取點,通過控制台擷取。
      ## 您在控制台擷取的預設存取點。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制台建立同名Topic,在本例中建立Topic:server1。
      ## 所有Table的變更資料,會記錄在server1.$DATABASE.$TABLE的Topic中,如 server1.inventory.products。
      ## 因此使用者需要提前在控制台中建立所有相關Topic。
      "database.server.name": "server1",
      ## 記錄schema變化資訊將記錄在這個Topic中。
      ## 需要提前在控制台建立。
      "database.history.kafka.topic": "schema-changes-inventory"
    • 公網接入

      ## 雲訊息佇列 Kafka 版存取點,通過控制台擷取。儲存db中schema變化資訊。
      ## 您在控制台擷取的SSL存取點。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制台建立同名Topic,在本例中建立Topic:server1。
      ## 所有Table的變更資料,會記錄在server1.$DATABASE.$TABLE的Topic中,如 server1.testDB.products。
      ## 因此使用者需要提前在控制台中建立所有相關Topic。
      "database.server.name": "server1",
      ## schema變化資訊將記錄在這個Topic中。
      ## 需要提前在控制台建立。
      "database.history.kafka.topic": "schema-changes-inventory",
      ## SSL公網方式訪問配置。
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. 配置好register-mysql.json後,您需要根據配置在控制台建立相應的Topic,相關操作步驟,請參見步驟一:建立Topic

    按照本教程中的方式安裝的MySQL,您可以看到MySQL中已經提前建立好了database:inventory。其中有四張表:

    • customers

    • orders

    • products

    • products_on_hand

    根據以上配置,您需要使用OpenAPI建立Topic:

    • server1

    • server1.inventory.customers

    • server1.inventory.orders

    • server1.inventory.products

    • server1.inventory.products_on_hand

    register-mysql.json中,配置了將schema變化資訊記錄在schema-changes-testDB,因此您還需要使用OpenAPI建立Topic:schema-changes-inventory。 使用OpenAPI建立Topic,請參見CreateTopic - 建立Topic

  4. 執行以下命令啟動MySQL Connector。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

結果驗證

按照以下步驟操作確認雲訊息佇列 Kafka 版能否接收到MySQL的變更資料。

  1. 變更MySQL Table中的資料。

  2. 在控制台的訊息查詢頁面,查詢變更資料。