Ketika aplikasi Anda memerlukan visibilitas real-time terhadap perubahan data MySQL—misalnya untuk dasbor analitik, pengindeksan pencarian, atau pemrosesan event downstream—Anda membutuhkan pipeline Change Data Capture (CDC). Canal menangkap perubahan inkremental dari log biner MySQL dan mengalirkannya ke ApsaraMQ for Kafka, sehingga konsumen downstream menerima event INSERT, UPDATE, dan DELETE tingkat baris.
Cara kerja Canal
Canal meniru database sekunder MySQL. Canal mengirim permintaan dump ke database MySQL primer, menerima event log biner, menguraikannya menjadi catatan perubahan terstruktur, lalu meneruskan catatan tersebut ke ApsaraMQ for Kafka.

Untuk informasi lebih lanjut, lihat Canal di GitHub.
Apa yang akan Anda lakukan
Panduan ini mencakup lima langkah berikut:
Konfigurasikan binary logging MySQL
Unduh dan instal Canal
Konfigurasikan koneksi MySQL
Konfigurasikan koneksi Kafka
Jalankan Canal dan verifikasi aliran data
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Database MySQL dengan binary logging yang diaktifkan dalam format
ROW(lihat Konfigurasikan binary logging MySQL)Instans ApsaraMQ for Kafka dengan setidaknya satu topik yang telah dibuat—lihat Langkah 3: Buat resource
Konektivitas jaringan antara host Canal, database MySQL, dan instans ApsaraMQ for Kafka
Konfigurasikan binary logging MySQL
Canal bergantung pada binary logging tingkat baris untuk menangkap perubahan. Tambahkan pengaturan berikut ke file konfigurasi MySQL Anda (biasanya my.cnf atau my.ini), lalu restart MySQL:
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL| Parameter | Deskripsi |
|---|---|
server-id | Bilangan bulat unik di seluruh server MySQL dan klien replikasi dalam kluster |
log_bin | Nama dasar untuk file log biner |
binlog_format | Harus ROW. Canal tidak dapat mengurai log format STATEMENT atau MIXED |
binlog_row_image | Harus FULL untuk menangkap gambar baris lengkap sebelum dan sesudah perubahan |
Buat pengguna MySQL untuk Canal dengan hak istimewa replikasi:
CREATE USER 'canal'@'%' IDENTIFIED BY '<your-password>';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;Untuk informasi lebih lanjut, lihat Panduan Cepat Mulai Canal.
Unduh dan instal Canal
Panduan ini menggunakan Canal V1.1.5. Unduh paketnya dari GitHub.
# Buat direktori instalasi
mkdir -p /home/doc/tools/canal.deployer-1.1.5
# Ekstrak paket
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5Konfigurasikan koneksi MySQL
Edit conf/example/instance.properties agar Canal mengarah ke database MySQL Anda:
vi conf/example/instance.propertiesAtur parameter berikut:
# Koneksi MySQL
canal.instance.master.address = 192.168.XX.XX:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = <your-password>
# Pemetaan topik Kafka
canal.mq.topic = mysql_test
# Routing partisi (pilih salah satu pendekatan)
# Opsi A: Kirim semua perubahan ke satu partisi
canal.mq.partition = 0
# Opsi B: Sebarkan perubahan ke beberapa partisi berdasarkan kunci primer
# canal.mq.partitionsNum = 3
# canal.mq.partitionHash = mytest.person:id,mytest.role:idReferensi parameter instance.properties
| Parameter | Wajib | Deskripsi |
|---|---|---|
canal.instance.master.address | Ya | Alamat database MySQL dalam format host:port |
canal.instance.dbUsername | Ya | Username MySQL dengan hak istimewa replikasi |
canal.instance.dbPassword | Ya | Password untuk pengguna MySQL |
canal.mq.topic | Ya | Topik target di ApsaraMQ for Kafka. Buat topik di halaman Topics pada Konsol ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Langkah 3: Buat resource |
canal.mq.dynamicTopic | Tidak | Pola regex yang mengarahkan perubahan dari tabel berbeda ke topik Kafka berbeda. Untuk detail sintaksis, lihat Referensi parameter MQ Canal |
canal.mq.partition | Tidak | Indeks partisi tetap untuk semua pesan. Tidak dapat digunakan bersamaan dengan canal.mq.partitionsNum atau canal.mq.partitionHash |
canal.mq.partitionsNum | Tidak | Jumlah total partisi untuk routing berbasis hash. Gunakan bersama canal.mq.partitionHash |
canal.mq.partitionHash | Tidak | Aturan routing hash dalam format database.table:column. Pisahkan beberapa aturan dengan koma. Untuk detail sintaksis, lihat Referensi parameter Canal MQ |
Konfigurasikan koneksi Kafka
Edit conf/canal.properties untuk menghubungkan Canal ke instans ApsaraMQ for Kafka Anda:
vi conf/canal.propertiesKonfigurasi berbeda tergantung metode akses jaringan Anda. Pilih salah satu opsi berikut:
Akses Internet (SASL_SSL) — memerlukan otentikasi dan enkripsi
Akses VPC (PLAINTEXT) — tidak memerlukan otentikasi
Untuk detail titik akhir, lihat Perbandingan antar titik akhir.
Akses Internet (SASL_SSL)
Saat terhubung melalui Internet, Canal melakukan otentikasi dengan ApsaraMQ for Kafka melalui SASL_SSL. Anda perlu mengubah tiga file.
File 1: conf/canal.properties
# Atur Canal untuk menggunakan Kafka sebagai output
canal.serverMode = kafka
# Titik akhir SSL instans ApsaraMQ for Kafka Anda
# Dapatkan ini dari bagian Informasi Titik Akhir di halaman Detail Instans
kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
# Pengaturan produsen Kafka (sesuaikan jika diperlukan)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
# Otentikasi SASL_SSL
kafka.ssl.truststore.location = ../conf/kafka_client_truststore_jks
kafka.ssl.truststore.password = KafkaOnsClient
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
kafka.ssl.endpoint.identification.algorithm =File 2: bin/startup.sh
Tambahkan path konfigurasi JAAS ke variabel JAVA_OPTS:
JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"File 3: conf/kafka_client_producer_jaas.conf
Tentukan kredensial SASL untuk instans ApsaraMQ for Kafka Anda:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<your-sasl-username>"
password="<your-sasl-password>";
};Ganti placeholder dengan nilai aktual Anda:
| Placeholder | Deskripsi | Lokasi penemuannya |
|---|---|---|
<your-sasl-username> | Username SASL | Instance Details di Konsol ApsaraMQ for Kafka |
<your-sasl-password> | Password SASL | Instance Details di Konsol ApsaraMQ for Kafka |
Jika fitur Access Control List (ACL) dinonaktifkan untuk instans Anda, Anda dapat memperoleh username dan password SASL default di halaman Instance Details pada Konsol ApsaraMQ for Kafka.
Jika fitur ACL diaktifkan untuk instans Anda, pengguna SASL harus bertipe PLAIN dan memiliki izin untuk mengirim serta mengonsumsi pesan. Untuk informasi lebih lanjut, lihat Berikan izin kepada pengguna SASL.
Unduh file truststore SSL kafka.client.truststore.jks dan simpan ke direktori conf/.
Akses VPC (PLAINTEXT)
Saat terhubung dari dalam VPC, tidak diperlukan otentikasi atau enkripsi. Atur canal.serverMode dan kafka.bootstrap.servers di conf/canal.properties:
# Atur Canal untuk menggunakan Kafka sebagai output
canal.serverMode = kafka
# Titik akhir default (VPC) instans ApsaraMQ for Kafka Anda
# Dapatkan ini dari bagian Informasi Titik Akhir di halaman Detail Instans
kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
# Pengaturan produsen Kafka (sesuaikan jika diperlukan)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0Referensi parameter canal.properties
| Parameter | Wajib | Deskripsi |
|---|---|---|
canal.serverMode | Ya | Jenis tujuan output. Atur ke kafka |
kafka.bootstrap.servers | Ya | Titik akhir ApsaraMQ for Kafka. Dapatkan ini dari bagian Endpoint Information di halaman Instance Details pada Konsol ApsaraMQ for Kafka. Gunakan port 9093 untuk akses Internet (SSL) atau port 9092 untuk akses VPC |
kafka.acks | Ya | Tingkat acknowledgment dari broker. 0: tanpa acknowledgment. 1: hanya acknowledgment dari leader. all: semua replika sinkron harus memberikan acknowledgment |
kafka.compression.type | Ya | Algoritma kompresi pesan. Nilai valid: none, gzip, snappy |
kafka.batch.size | Ya | Ukuran batch maksimum dalam byte. Nilai lebih besar meningkatkan throughput; nilai lebih kecil mengurangi latensi. Default: 16384 |
kafka.linger.ms | Ya | Waktu maksimum dalam milidetik untuk menunggu pesan tambahan sebelum mengirim batch. Default: 1 |
kafka.max.request.size | Ya | Ukuran maksimum permintaan produksi tunggal dalam byte. Default: 1048576 |
kafka.buffer.memory | Ya | Total memori dalam byte yang tersedia untuk buffering pesan yang belum dikirim. Default: 33554432 |
kafka.max.in.flight.requests.per.connection | Ya | Jumlah maksimum permintaan yang belum diacknowledge per koneksi. Atur ke 1 untuk menjamin pengurutan. Default: 1 |
kafka.retries | Ya | Jumlah upaya pengulangan untuk pengiriman yang gagal. 0 menonaktifkan pengulangan. Default: 0 |
kafka.ssl.truststore.location | Hanya Internet | Path ke file truststore SSL (kafka.client.truststore.jks) |
kafka.ssl.truststore.password | Hanya Internet | Password truststore. Atur ke KafkaOnsClient |
kafka.security.protocol | Hanya Internet | Protokol keamanan. Atur ke SASL_SSL untuk akses Internet |
kafka.sasl.mechanism | Hanya Internet | Mekanisme otentikasi SASL. Atur ke PLAIN |
Jalankan Canal dan verifikasi aliran data
Jalankan Canal dari direktori instalasi:
sh bin/startup.shVerifikasi bahwa Canal telah berjalan
Periksa log server Canal:
tail -f logs/canal/canal.logOutput yang diharapkan:
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......Periksa log instans Canal:
tail -f logs/example/example.logOutput yang diharapkan:
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....Uji dengan tabel sampel
Di database MySQL bernama
mysql, buat tabel uji dan masukkan data sampel:mysql> SELECT * FROM T_Student; +--------+---------+------+------+ | stuNum | stuName | age | sex | +--------+---------+------+------+ | 1 | Wang | 18 | girl | | 2 | Zhang | 17 | boy | +--------+---------+------+------+ 2 rows in set (0.00 sec)Konfirmasi bahwa Canal menangkap perubahan dengan memeriksa log meta. Setiap INSERT, UPDATE, atau DELETE menghasilkan entri baru: Output yang diharapkan: Setiap baris menunjukkan file log biner (
log.000001), offset byte (misalnya,29723), dan alamat sumber MySQL—mengonfirmasi bahwa Canal membaca perubahan.tail -f logs/example/meta.log2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]Verifikasi bahwa pesan telah tiba di ApsaraMQ for Kafka. Buka Konsol ApsaraMQ for Kafka dan kueri pesan untuk topik Anda. Untuk informasi lebih lanjut, lihat Kueri pesan.

Setelah selesai menguji, hentikan Canal:
sh bin/stop.sh
Pemecahan masalah
| Gejala | Kemungkinan penyebab | Solusi |
|---|---|---|
| Canal berjalan tetapi tidak ada data yang mengalir ke Kafka | Binary logging MySQL tidak dalam format ROW | Atur binlog_format = ROW di my.cnf dan restart MySQL. Canal tidak dapat mengurai log format STATEMENT atau MIXED |
Authentication failed di log Canal | Kredensial SASL salah atau konfigurasi JAAS tidak ada | Verifikasi username dan password di kafka_client_producer_jaas.conf. Pastikan path JAAS diatur dengan benar di startup.sh |
Connection refused saat menghubungkan ke Kafka | Titik akhir atau port salah | Gunakan port 9093 untuk akses Internet (SSL) dan port 9092 untuk akses VPC. Verifikasi titik akhir di halaman Instance Details |
No topic error | Topik tidak ada di ApsaraMQ for Kafka | Buat topik di Konsol ApsaraMQ for Kafka sebelum menjalankan Canal |
Log Canal menunjukkan start successful tetapi meta.log tidak memiliki entri | Tidak ada perubahan yang terjadi di database MySQL setelah Canal dijalankan | Canal hanya menangkap perubahan inkremental. Jalankan pernyataan INSERT, UPDATE, atau DELETE di MySQL untuk menghasilkan event log biner |