All Products
Search
Document Center

ApsaraMQ for Kafka:Gunakan Kafka Connect untuk menyinkronkan data dari database MySQL ke instance ApsaraMQ for Kafka

Last Updated:Sep 02, 2025

Topik ini menjelaskan cara menggunakan konektor sumber Kafka Connect untuk menyinkronkan data dari database MySQL ke instance ApsaraMQ for Kafka.

Informasi latar belakang

Kafka Connect digunakan untuk mengimpor aliran data ke dan mengekspor aliran data dari ApsaraMQ for Kafka. Kafka Connect memanfaatkan berbagai konektor sumber untuk mengimpor data dari sistem pihak ketiga ke broker ApsaraMQ for Kafka serta berbagai konektor sink untuk mengekspor data dari broker ApsaraMQ for Kafka ke sistem pihak ketiga.system

Prasyarat

Sebelum menggunakan konektor sumber Kafka Connect untuk menyinkronkan data, pastikan langkah-langkah berikut telah dilakukan:

  • Paket konektor sumber MySQL telah diunduh.

    Catatan

    Dalam contoh ini, konektor sumber MySQL V0.5.2 digunakan.

  • Kafka Connect telah diunduh.

    Catatan

    Dalam contoh ini, Kafka Connect V0.10.2.2 digunakan.

  • Docker telah diinstal.

Langkah 1: Konfigurasi Kafka Connect

  1. Ekstrak paket konektor sumber MySQL yang diunduh ke direktori yang ditentukan.

  2. Di file konfigurasi connect-distributed.properties Kafka Connect, tentukan jalur instalasi konektor sumber MySQL.

    plugin.path=/kafka/connect/plugins
    Penting

    Pada versi sebelumnya Kafka Connect, parameter plugin.path tidak didukung. Anda harus menentukan jalur menggunakan parameter CLASSPATH.

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

Langkah 2: Mulai Kafka Connect

Setelah file konfigurasi connect-distributed.properties dikonfigurasi, pilih salah satu metode berikut untuk memulai Kafka Connect:

  • Akses dari Internet

    1. Jalankan perintah export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf" untuk mengonfigurasi java.security.auth.login.config.

    2. Jalankan perintah bin/connect-distributed.sh config/connect-distributed.properties untuk memulai Kafka Connect.

  • Akses dari virtual private cloud (VPC)

    Jalankan perintah bin/connect-distributed.sh config/connect-distributed.properties untuk memulai Kafka Connect.

Langkah 3: Instal MySQL

  1. Unduh file docker-compose-mysql.yaml.

  2. Jalankan perintah berikut untuk menginstal MySQL:

    export DEBEZIUM_VERSION=0.5
    docker-compose -f docker-compose-mysql.yaml up

Langkah 4: Konfigurasi MySQL

  1. Tambahkan konten berikut ke file konfigurasi untuk mengaktifkan pencatatan biner di MySQL dan tentukan baris sebagai mode pencatatan biner:

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1 
  2. Jalankan perintah berikut untuk memberikan izin kepada pengguna MySQL:

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    Catatan

    Dalam contoh ini, nama pengguna MySQL adalah debezium dan kata sandinya adalah dbz.

Langkah 5: Mulai konektor sumber MySQL

  1. Unduh file register-mysql.json.

  2. Konfigurasikan file register-mysql.json.

    • Akses dari VPC

      ## Titik akhir instance ApsaraMQ for Kafka. Anda dapat memperoleh titik akhir di konsol ApsaraMQ for Kafka. 
      ## Titik akhir default yang Anda peroleh di konsol ApsaraMQ for Kafka. 
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## Anda harus membuat topik yang memiliki nama sama dengan topik yang ditentukan dalam database MySQL di konsol ApsaraMQ for Kafka terlebih dahulu. Dalam contoh ini, topik bernama server1 dibuat. 
      ## Semua perubahan tabel dicatat dalam topik dengan format server1.$DATABASE.$TABLE, seperti server1.inventory.products. 
      ## Oleh karena itu, Anda harus membuat semua topik terkait di konsol ApsaraMQ for Kafka terlebih dahulu. 
      "database.server.name": "server1",
      ## Perubahan skema dicatat dalam topik ini. 
      ## Anda harus membuat topik ini di konsol ApsaraMQ for Kafka terlebih dahulu. 
      "database.history.kafka.topic": "schema-changes-inventory"
    • Akses dari Internet

      ## Titik akhir instance ApsaraMQ for Kafka. Anda dapat memperoleh titik akhir di konsol ApsaraMQ for Kafka. Perubahan skema database disimpan pada broker ApsaraMQ for Kafka. 
      ## Titik akhir SSL yang Anda peroleh di konsol ApsaraMQ for Kafka. 
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## Anda harus membuat topik yang memiliki nama sama dengan topik yang ditentukan dalam database MySQL di konsol ApsaraMQ for Kafka terlebih dahulu. Dalam contoh ini, topik bernama server1 dibuat. 
      ## Semua perubahan tabel dicatat dalam topik dengan format server1.$DATABASE.$TABLE, seperti server1.testDB.products. 
      ## Oleh karena itu, Anda harus membuat semua topik terkait di konsol ApsaraMQ for Kafka terlebih dahulu. 
      "database.server.name": "server1",
      ## Perubahan skema dicatat dalam topik ini. 
      ## Anda harus membuat topik ini di konsol ApsaraMQ for Kafka terlebih dahulu. 
      "database.history.kafka.topic": "schema-changes-inventory",
      ## Tentukan konfigurasi berikut untuk mengaktifkan akses Internet berbasis SSL: 
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. Setelah mengonfigurasi file register-mysql.json, buat topik terkait di konsol ApsaraMQ for Kafka sesuai dengan konfigurasi tersebut. Untuk informasi lebih lanjut, lihat Langkah 1: Buat Topik.

    Dalam contoh ini, database database:inventory dibuat terlebih dahulu di MySQL. Database tersebut berisi tabel-tabel berikut:

    • customers

    • orders

    • products

    • products_on_hand

    Berdasarkan konfigurasi di atas, buat topik-topik berikut dengan memanggil operasi CreateTopic:

    • server1

    • server1.inventory.customers

    • server1.inventory.orders

    • server1.inventory.products

    • server1.inventory.products_on_hand

    Berdasarkan konfigurasi dalam file register-mysql.json, perubahan skema perlu disimpan di schema-changes-testDB. Oleh karena itu, buat topik schema-changes-inventory dengan memanggil operasi CreateTopic. Untuk informasi tentang cara membuat topik dengan memanggil operasi CreateTopic, lihat CreateTopic.

  4. Jalankan perintah berikut untuk memulai konektor sumber MySQL:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

Verifikasi hasil

Lakukan langkah-langkah berikut untuk memeriksa apakah ApsaraMQ for Kafka dapat menerima data dari MySQL:

  1. Ubah data tabel di MySQL.

  2. Masuk ke konsol ApsaraMQ for Kafka. Di halaman Message Query, kueri data yang telah berubah.