All Products
Search
Document Center

ApsaraMQ for Kafka:Sinkronisasi data MySQL ke ApsaraMQ for Kafka menggunakan Canal

Last Updated:Mar 12, 2026

Ketika aplikasi Anda memerlukan visibilitas real-time terhadap perubahan data MySQL—misalnya untuk dasbor analitik, pengindeksan pencarian, atau pemrosesan event downstream—Anda membutuhkan pipeline Change Data Capture (CDC). Canal menangkap perubahan inkremental dari log biner MySQL dan mengalirkannya ke ApsaraMQ for Kafka, sehingga konsumen downstream menerima event INSERT, UPDATE, dan DELETE tingkat baris.

Cara kerja Canal

Canal meniru database sekunder MySQL. Canal mengirim permintaan dump ke database MySQL primer, menerima event log biner, menguraikannya menjadi catatan perubahan terstruktur, lalu meneruskan catatan tersebut ke ApsaraMQ for Kafka.

How Canal works

Untuk informasi lebih lanjut, lihat Canal di GitHub.

Apa yang akan Anda lakukan

Panduan ini mencakup lima langkah berikut:

  1. Konfigurasikan binary logging MySQL

  2. Unduh dan instal Canal

  3. Konfigurasikan koneksi MySQL

  4. Konfigurasikan koneksi Kafka

  5. Jalankan Canal dan verifikasi aliran data

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Database MySQL dengan binary logging yang diaktifkan dalam format ROW (lihat Konfigurasikan binary logging MySQL)

  • Instans ApsaraMQ for Kafka dengan setidaknya satu topik yang telah dibuat—lihat Langkah 3: Buat resource

  • Konektivitas jaringan antara host Canal, database MySQL, dan instans ApsaraMQ for Kafka

Konfigurasikan binary logging MySQL

Canal bergantung pada binary logging tingkat baris untuk menangkap perubahan. Tambahkan pengaturan berikut ke file konfigurasi MySQL Anda (biasanya my.cnf atau my.ini), lalu restart MySQL:

[mysqld]
server-id         = 1
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
ParameterDeskripsi
server-idBilangan bulat unik di seluruh server MySQL dan klien replikasi dalam kluster
log_binNama dasar untuk file log biner
binlog_formatHarus ROW. Canal tidak dapat mengurai log format STATEMENT atau MIXED
binlog_row_imageHarus FULL untuk menangkap gambar baris lengkap sebelum dan sesudah perubahan

Buat pengguna MySQL untuk Canal dengan hak istimewa replikasi:

CREATE USER 'canal'@'%' IDENTIFIED BY '<your-password>';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

Untuk informasi lebih lanjut, lihat Panduan Cepat Mulai Canal.

Unduh dan instal Canal

Panduan ini menggunakan Canal V1.1.5. Unduh paketnya dari GitHub.

# Buat direktori instalasi
mkdir -p /home/doc/tools/canal.deployer-1.1.5

# Ekstrak paket
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5

Konfigurasikan koneksi MySQL

Edit conf/example/instance.properties agar Canal mengarah ke database MySQL Anda:

vi conf/example/instance.properties

Atur parameter berikut:

# Koneksi MySQL
canal.instance.master.address = 192.168.XX.XX:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = <your-password>

# Pemetaan topik Kafka
canal.mq.topic = mysql_test

# Routing partisi (pilih salah satu pendekatan)
# Opsi A: Kirim semua perubahan ke satu partisi
canal.mq.partition = 0

# Opsi B: Sebarkan perubahan ke beberapa partisi berdasarkan kunci primer
# canal.mq.partitionsNum = 3
# canal.mq.partitionHash = mytest.person:id,mytest.role:id

Referensi parameter instance.properties

ParameterWajibDeskripsi
canal.instance.master.addressYaAlamat database MySQL dalam format host:port
canal.instance.dbUsernameYaUsername MySQL dengan hak istimewa replikasi
canal.instance.dbPasswordYaPassword untuk pengguna MySQL
canal.mq.topicYaTopik target di ApsaraMQ for Kafka. Buat topik di halaman Topics pada Konsol ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Langkah 3: Buat resource
canal.mq.dynamicTopicTidakPola regex yang mengarahkan perubahan dari tabel berbeda ke topik Kafka berbeda. Untuk detail sintaksis, lihat Referensi parameter MQ Canal
canal.mq.partitionTidakIndeks partisi tetap untuk semua pesan. Tidak dapat digunakan bersamaan dengan canal.mq.partitionsNum atau canal.mq.partitionHash
canal.mq.partitionsNumTidakJumlah total partisi untuk routing berbasis hash. Gunakan bersama canal.mq.partitionHash
canal.mq.partitionHashTidakAturan routing hash dalam format database.table:column. Pisahkan beberapa aturan dengan koma. Untuk detail sintaksis, lihat Referensi parameter Canal MQ

Konfigurasikan koneksi Kafka

Edit conf/canal.properties untuk menghubungkan Canal ke instans ApsaraMQ for Kafka Anda:

vi conf/canal.properties

Konfigurasi berbeda tergantung metode akses jaringan Anda. Pilih salah satu opsi berikut:

  • Akses Internet (SASL_SSL) — memerlukan otentikasi dan enkripsi

  • Akses VPC (PLAINTEXT) — tidak memerlukan otentikasi

Untuk detail titik akhir, lihat Perbandingan antar titik akhir.

Akses Internet (SASL_SSL)

Saat terhubung melalui Internet, Canal melakukan otentikasi dengan ApsaraMQ for Kafka melalui SASL_SSL. Anda perlu mengubah tiga file.

File 1: conf/canal.properties

# Atur Canal untuk menggunakan Kafka sebagai output
canal.serverMode = kafka

# Titik akhir SSL instans ApsaraMQ for Kafka Anda
# Dapatkan ini dari bagian Informasi Titik Akhir di halaman Detail Instans
kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093

# Pengaturan produsen Kafka (sesuaikan jika diperlukan)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

# Otentikasi SASL_SSL
kafka.ssl.truststore.location = ../conf/kafka_client_truststore_jks
kafka.ssl.truststore.password = KafkaOnsClient
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
kafka.ssl.endpoint.identification.algorithm =

File 2: bin/startup.sh

Tambahkan path konfigurasi JAAS ke variabel JAVA_OPTS:

JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"

File 3: conf/kafka_client_producer_jaas.conf

Tentukan kredensial SASL untuk instans ApsaraMQ for Kafka Anda:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<your-sasl-username>"
  password="<your-sasl-password>";
};

Ganti placeholder dengan nilai aktual Anda:

PlaceholderDeskripsiLokasi penemuannya
<your-sasl-username>Username SASLInstance Details di Konsol ApsaraMQ for Kafka
<your-sasl-password>Password SASLInstance Details di Konsol ApsaraMQ for Kafka
Catatan
  • Jika fitur Access Control List (ACL) dinonaktifkan untuk instans Anda, Anda dapat memperoleh username dan password SASL default di halaman Instance Details pada Konsol ApsaraMQ for Kafka.

  • Jika fitur ACL diaktifkan untuk instans Anda, pengguna SASL harus bertipe PLAIN dan memiliki izin untuk mengirim serta mengonsumsi pesan. Untuk informasi lebih lanjut, lihat Berikan izin kepada pengguna SASL.

Unduh file truststore SSL kafka.client.truststore.jks dan simpan ke direktori conf/.

Akses VPC (PLAINTEXT)

Saat terhubung dari dalam VPC, tidak diperlukan otentikasi atau enkripsi. Atur canal.serverMode dan kafka.bootstrap.servers di conf/canal.properties:

# Atur Canal untuk menggunakan Kafka sebagai output
canal.serverMode = kafka

# Titik akhir default (VPC) instans ApsaraMQ for Kafka Anda
# Dapatkan ini dari bagian Informasi Titik Akhir di halaman Detail Instans
kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092

# Pengaturan produsen Kafka (sesuaikan jika diperlukan)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

Referensi parameter canal.properties

ParameterWajibDeskripsi
canal.serverModeYaJenis tujuan output. Atur ke kafka
kafka.bootstrap.serversYaTitik akhir ApsaraMQ for Kafka. Dapatkan ini dari bagian Endpoint Information di halaman Instance Details pada Konsol ApsaraMQ for Kafka. Gunakan port 9093 untuk akses Internet (SSL) atau port 9092 untuk akses VPC
kafka.acksYaTingkat acknowledgment dari broker. 0: tanpa acknowledgment. 1: hanya acknowledgment dari leader. all: semua replika sinkron harus memberikan acknowledgment
kafka.compression.typeYaAlgoritma kompresi pesan. Nilai valid: none, gzip, snappy
kafka.batch.sizeYaUkuran batch maksimum dalam byte. Nilai lebih besar meningkatkan throughput; nilai lebih kecil mengurangi latensi. Default: 16384
kafka.linger.msYaWaktu maksimum dalam milidetik untuk menunggu pesan tambahan sebelum mengirim batch. Default: 1
kafka.max.request.sizeYaUkuran maksimum permintaan produksi tunggal dalam byte. Default: 1048576
kafka.buffer.memoryYaTotal memori dalam byte yang tersedia untuk buffering pesan yang belum dikirim. Default: 33554432
kafka.max.in.flight.requests.per.connectionYaJumlah maksimum permintaan yang belum diacknowledge per koneksi. Atur ke 1 untuk menjamin pengurutan. Default: 1
kafka.retriesYaJumlah upaya pengulangan untuk pengiriman yang gagal. 0 menonaktifkan pengulangan. Default: 0
kafka.ssl.truststore.locationHanya InternetPath ke file truststore SSL (kafka.client.truststore.jks)
kafka.ssl.truststore.passwordHanya InternetPassword truststore. Atur ke KafkaOnsClient
kafka.security.protocolHanya InternetProtokol keamanan. Atur ke SASL_SSL untuk akses Internet
kafka.sasl.mechanismHanya InternetMekanisme otentikasi SASL. Atur ke PLAIN

Jalankan Canal dan verifikasi aliran data

Jalankan Canal dari direktori instalasi:

sh bin/startup.sh

Verifikasi bahwa Canal telah berjalan

Periksa log server Canal:

tail -f logs/canal/canal.log

Output yang diharapkan:

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

Periksa log instans Canal:

tail -f logs/example/example.log

Output yang diharapkan:

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

Uji dengan tabel sampel

  1. Di database MySQL bernama mysql, buat tabel uji dan masukkan data sampel:

        mysql> SELECT * FROM T_Student;
        +--------+---------+------+------+
        | stuNum | stuName | age  | sex  |
        +--------+---------+------+------+
        |      1 | Wang    |   18 | girl |
        |      2 | Zhang   |   17 | boy  |
        +--------+---------+------+------+
        2 rows in set (0.00 sec)
  2. Konfirmasi bahwa Canal menangkap perubahan dengan memeriksa log meta. Setiap INSERT, UPDATE, atau DELETE menghasilkan entri baru: Output yang diharapkan: Setiap baris menunjukkan file log biner (log.000001), offset byte (misalnya, 29723), dan alamat sumber MySQL—mengonfirmasi bahwa Canal membaca perubahan.

        tail -f logs/example/meta.log
        2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306]
        2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]
  3. Verifikasi bahwa pesan telah tiba di ApsaraMQ for Kafka. Buka Konsol ApsaraMQ for Kafka dan kueri pesan untuk topik Anda. Untuk informasi lebih lanjut, lihat Kueri pesan.

    Query messages in the ApsaraMQ for Kafka console

  4. Setelah selesai menguji, hentikan Canal:

        sh bin/stop.sh

Pemecahan masalah

GejalaKemungkinan penyebabSolusi
Canal berjalan tetapi tidak ada data yang mengalir ke KafkaBinary logging MySQL tidak dalam format ROWAtur binlog_format = ROW di my.cnf dan restart MySQL. Canal tidak dapat mengurai log format STATEMENT atau MIXED
Authentication failed di log CanalKredensial SASL salah atau konfigurasi JAAS tidak adaVerifikasi username dan password di kafka_client_producer_jaas.conf. Pastikan path JAAS diatur dengan benar di startup.sh
Connection refused saat menghubungkan ke KafkaTitik akhir atau port salahGunakan port 9093 untuk akses Internet (SSL) dan port 9092 untuk akses VPC. Verifikasi titik akhir di halaman Instance Details
No topic errorTopik tidak ada di ApsaraMQ for KafkaBuat topik di Konsol ApsaraMQ for Kafka sebelum menjalankan Canal
Log Canal menunjukkan start successful tetapi meta.log tidak memiliki entriTidak ada perubahan yang terjadi di database MySQL setelah Canal dijalankanCanal hanya menangkap perubahan inkremental. Jalankan pernyataan INSERT, UPDATE, atau DELETE di MySQL untuk menghasilkan event log biner

Topik terkait