本教程介紹如何使用Kafka Connect的Source Connector將MySQL的資料同步至雲訊息佇列 Kafka 版。
背景資訊
Kafka Connect主要用於將資料流輸入和輸出雲訊息佇列 Kafka 版。Kafka Connect主要通過各種Source Connector的實現,將資料從第三方系統輸入到Kafka Broker,通過各種Sink Connector實現,將資料從Kafka Broker中匯入到第三方系統。
前提條件
在開始本教程前,請確保您已完成以下操作:
步驟一:配置Kafka Connect
將下載完成的MySQL Connector解壓到指定目錄。
在Kafka Connect的設定檔connect-distributed.properties中配置外掛程式安裝位置。
plugin.path=/kafka/connect/plugins重要Kafka Connect的早期版本不支援配置plugin.path,您需要在CLASSPATH中指定外掛程式位置。
export CLASSPATH=/kafka/connect/plugins/mysql-connector/*
步驟二:啟動Kafka Connect
在配置好connect-distributed.properties後,執行以下命令啟動Kafka Connect。
公網接入
執行命令
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"設定java.security.auth.login.config。執行命令
bin/connect-distributed.sh config/connect-distributed.properties啟動Kafka Connect。
VPC接入
執行命令
bin/connect-distributed.sh config/connect-distributed.properties啟動Kafka Connect。
步驟三:安裝MySQL
執行以下命令安裝MySQL。
export DEBEZIUM_VERSION=0.5 docker-compose -f docker-compose-mysql.yaml up
步驟四:配置MySQL
在設定檔中配置以下內容,開啟MySQL的binlog寫入功能,並配置binlog模式為row。
[mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1執行以下命令設定MySQL的User許可權。
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';說明樣本中MySQL的User為debezium,密碼為dbz。
步驟五:啟動MySQL Connector
編輯register-mysql.json。
VPC接入
## 雲訊息佇列 Kafka 版存取點,通過控制台擷取。 ## 您在控制台擷取的預設存取點。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 需要提前在控制台建立同名Topic,在本例中建立Topic:server1。 ## 所有Table的變更資料,會記錄在server1.$DATABASE.$TABLE的Topic中,如 server1.inventory.products。 ## 因此使用者需要提前在控制台中建立所有相關Topic。 "database.server.name": "server1", ## 記錄schema變化資訊將記錄在這個Topic中。 ## 需要提前在控制台建立。 "database.history.kafka.topic": "schema-changes-inventory"公網接入
## 雲訊息佇列 Kafka 版存取點,通過控制台擷取。儲存db中schema變化資訊。 ## 您在控制台擷取的SSL存取點。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 需要提前在控制台建立同名Topic,在本例中建立Topic:server1。 ## 所有Table的變更資料,會記錄在server1.$DATABASE.$TABLE的Topic中,如 server1.testDB.products。 ## 因此使用者需要提前在控制台中建立所有相關Topic。 "database.server.name": "server1", ## schema變化資訊將記錄在這個Topic中。 ## 需要提前在控制台建立。 "database.history.kafka.topic": "schema-changes-inventory", ## SSL公網方式訪問配置。 "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN",
配置好register-mysql.json後,您需要根據配置在控制台建立相應的Topic,相關操作步驟,請參見步驟一:建立Topic。
按照本教程中的方式安裝的MySQL,您可以看到MySQL中已經提前建立好了database:inventory。其中有四張表:
customers
orders
products
products_on_hand
根據以上配置,您需要使用OpenAPI建立Topic:
server1
server1.inventory.customers
server1.inventory.orders
server1.inventory.products
server1.inventory.products_on_hand
在register-mysql.json中,配置了將schema變化資訊記錄在schema-changes-testDB,因此您還需要使用OpenAPI建立Topic:schema-changes-inventory。 使用OpenAPI建立Topic,請參見CreateTopic - 建立Topic。
執行以下命令啟動MySQL Connector。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
結果驗證
按照以下步驟操作確認雲訊息佇列 Kafka 版能否接收到MySQL的變更資料。
變更MySQL Table中的資料。
在控制台的訊息查詢頁面,查詢變更資料。