Topik ini menjelaskan cara menggunakan konektor sumber Kafka Connect untuk menyinkronkan data dari database MySQL ke instance ApsaraMQ for Kafka.
Informasi latar belakang
Kafka Connect digunakan untuk mengimpor aliran data ke dan mengekspor aliran data dari ApsaraMQ for Kafka. Kafka Connect memanfaatkan berbagai konektor sumber untuk mengimpor data dari sistem pihak ketiga ke broker ApsaraMQ for Kafka serta berbagai konektor sink untuk mengekspor data dari broker ApsaraMQ for Kafka ke sistem pihak ketiga.
Prasyarat
Sebelum menggunakan konektor sumber Kafka Connect untuk menyinkronkan data, pastikan langkah-langkah berikut telah dilakukan:
Langkah 1: Konfigurasi Kafka Connect
Ekstrak paket konektor sumber MySQL yang diunduh ke direktori yang ditentukan.
Di file konfigurasi connect-distributed.properties Kafka Connect, tentukan jalur instalasi konektor sumber MySQL.
plugin.path=/kafka/connect/pluginsPentingPada versi sebelumnya Kafka Connect, parameter plugin.path tidak didukung. Anda harus menentukan jalur menggunakan parameter CLASSPATH.
export CLASSPATH=/kafka/connect/plugins/mysql-connector/*
Langkah 2: Mulai Kafka Connect
Setelah file konfigurasi connect-distributed.properties dikonfigurasi, pilih salah satu metode berikut untuk memulai Kafka Connect:
Akses dari Internet
Jalankan perintah
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"untuk mengonfigurasi java.security.auth.login.config.Jalankan perintah
bin/connect-distributed.sh config/connect-distributed.propertiesuntuk memulai Kafka Connect.
Akses dari virtual private cloud (VPC)
Jalankan perintah
bin/connect-distributed.sh config/connect-distributed.propertiesuntuk memulai Kafka Connect.
Langkah 3: Instal MySQL
Unduh file docker-compose-mysql.yaml.
Jalankan perintah berikut untuk menginstal MySQL:
export DEBEZIUM_VERSION=0.5 docker-compose -f docker-compose-mysql.yaml up
Langkah 4: Konfigurasi MySQL
Tambahkan konten berikut ke file konfigurasi untuk mengaktifkan pencatatan biner di MySQL dan tentukan baris sebagai mode pencatatan biner:
[mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1Jalankan perintah berikut untuk memberikan izin kepada pengguna MySQL:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';CatatanDalam contoh ini, nama pengguna MySQL adalah debezium dan kata sandinya adalah dbz.
Langkah 5: Mulai konektor sumber MySQL
Unduh file register-mysql.json.
Konfigurasikan file register-mysql.json.
Akses dari VPC
## Titik akhir instance ApsaraMQ for Kafka. Anda dapat memperoleh titik akhir di konsol ApsaraMQ for Kafka. ## Titik akhir default yang Anda peroleh di konsol ApsaraMQ for Kafka. "database.history.kafka.bootstrap.servers" : "kafka:9092", ## Anda harus membuat topik yang memiliki nama sama dengan topik yang ditentukan dalam database MySQL di konsol ApsaraMQ for Kafka terlebih dahulu. Dalam contoh ini, topik bernama server1 dibuat. ## Semua perubahan tabel dicatat dalam topik dengan format server1.$DATABASE.$TABLE, seperti server1.inventory.products. ## Oleh karena itu, Anda harus membuat semua topik terkait di konsol ApsaraMQ for Kafka terlebih dahulu. "database.server.name": "server1", ## Perubahan skema dicatat dalam topik ini. ## Anda harus membuat topik ini di konsol ApsaraMQ for Kafka terlebih dahulu. "database.history.kafka.topic": "schema-changes-inventory"Akses dari Internet
## Titik akhir instance ApsaraMQ for Kafka. Anda dapat memperoleh titik akhir di konsol ApsaraMQ for Kafka. Perubahan skema database disimpan pada broker ApsaraMQ for Kafka. ## Titik akhir SSL yang Anda peroleh di konsol ApsaraMQ for Kafka. "database.history.kafka.bootstrap.servers" : "kafka:9092", ## Anda harus membuat topik yang memiliki nama sama dengan topik yang ditentukan dalam database MySQL di konsol ApsaraMQ for Kafka terlebih dahulu. Dalam contoh ini, topik bernama server1 dibuat. ## Semua perubahan tabel dicatat dalam topik dengan format server1.$DATABASE.$TABLE, seperti server1.testDB.products. ## Oleh karena itu, Anda harus membuat semua topik terkait di konsol ApsaraMQ for Kafka terlebih dahulu. "database.server.name": "server1", ## Perubahan skema dicatat dalam topik ini. ## Anda harus membuat topik ini di konsol ApsaraMQ for Kafka terlebih dahulu. "database.history.kafka.topic": "schema-changes-inventory", ## Tentukan konfigurasi berikut untuk mengaktifkan akses Internet berbasis SSL: "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN",
Setelah mengonfigurasi file register-mysql.json, buat topik terkait di konsol ApsaraMQ for Kafka sesuai dengan konfigurasi tersebut. Untuk informasi lebih lanjut, lihat Langkah 1: Buat Topik.
Dalam contoh ini, database database:inventory dibuat terlebih dahulu di MySQL. Database tersebut berisi tabel-tabel berikut:
customers
orders
products
products_on_hand
Berdasarkan konfigurasi di atas, buat topik-topik berikut dengan memanggil operasi CreateTopic:
server1
server1.inventory.customers
server1.inventory.orders
server1.inventory.products
server1.inventory.products_on_hand
Berdasarkan konfigurasi dalam file register-mysql.json, perubahan skema perlu disimpan di schema-changes-testDB. Oleh karena itu, buat topik schema-changes-inventory dengan memanggil operasi CreateTopic. Untuk informasi tentang cara membuat topik dengan memanggil operasi CreateTopic, lihat CreateTopic.
Jalankan perintah berikut untuk memulai konektor sumber MySQL:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
Verifikasi hasil
Lakukan langkah-langkah berikut untuk memeriksa apakah ApsaraMQ for Kafka dapat menerima data dari MySQL:
Ubah data tabel di MySQL.
Masuk ke konsol ApsaraMQ for Kafka. Di halaman Message Query, kueri data yang telah berubah.