Canal digunakan untuk berlangganan dan mengonsumsi data tambahan dengan cara menguraikan log binari dari database MySQL. Awalnya, Alibaba Group perlu menyinkronkan data antara pusat data di Hangzhou dan Amerika Serikat menggunakan pendekatan berbasis pemicu bisnis. Sejak tahun 2010, metode ini berkembang menjadi penguraian log database untuk meningkatkan efisiensi langganan dan konsumsi data tambahan dalam database. Canal mendukung database sumber yang menggunakan mesin MySQL versi 5.1.x, 5.5.x, 5.6.x, 5.7.x, dan 8.0.x.
Informasi latar belakang
Canal digunakan untuk berlangganan dan mengonsumsi data tambahan dengan cara menguraikan log binari dari database MySQL. Awalnya, Alibaba Group perlu menyinkronkan data antara pusat data di Hangzhou dan Amerika Serikat menggunakan pendekatan berbasis pemicu bisnis. Sejak tahun 2010, metode ini berkembang menjadi penguraian log database untuk meningkatkan efisiensi langganan dan konsumsi data tambahan dalam database. Canal mendukung database sumber yang menggunakan mesin MySQL versi 5.1.x, 5.5.x, 5.6.x, 5.7.x, dan 8.0.x.
Canal memungkinkan Anda menulis data ke Kafka, dan DataHub kompatibel dengan protokol Kafka. Oleh karena itu, Anda dapat menggunakan Canal untuk menulis data tambahan dari MySQL ke DataHub. Untuk memastikan bahwa Canal dapat menulis data ke DataHub seperti halnya ke Kafka, beberapa perubahan telah dilakukan pada kerangka kerja Canal open source:
Nama Topik Kafka sesuai dengan format NamaProyek.NamaTopik di DataHub. Oleh karena itu, logika yang digunakan untuk mengganti titik (.) dalam Nama Topik Kafka dengan garis bawah (_) dihapus dari kerangka kerja Canal open source. Perubahan ini memastikan bahwa Nama Topik Kafka dapat dipetakan ke topik DataHub yang benar.
DataHub menggunakan PLAIN Simple Authentication and Security Layer (SASL) untuk otentikasi. Oleh karena itu, variabel lingkungan
-Djava.security.auth.login.config=$kafka_jaas_confditambahkan ke skrip startup.
Petunjuk
Topik ini memberikan contoh dasar tentang cara menggunakan Canal untuk menulis data ke DataHub seperti Kafka. Untuk informasi lebih lanjut tentang parameter dan deskripsi parameter, lihat canal.
1. Unduh paket canal.deployer
Unduh paket canal.deployer-1.1.5-SNAPSHOT.tar.gz. Versi Canal yang belum dimodifikasi untuk DataHub mungkin tidak dapat menulis data ke DataHub.
2. Salin paket canal.deployer ke direktori tetap dan ekstrak paket tersebut
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal3. Ubah parameter
3.1 Ubah File Konfigurasi Instance conf/example/instance.properties
# Ubah informasi database sesuai kebutuhan.
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password: nama pengguna dan kata sandi database.
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# konfigurasi mq
canal.mq.topic=test_project.test_topic
# Tentukan topik dinamis berdasarkan nama database atau nama tabel.
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# konfigurasi partisi hash
#canal.mq.partitionsNum=3
# NamaDatabase.NamaTabel: kunci utama unik. Beberapa tabel dipisahkan dengan koma (,).
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################Database MySQL yang alamat IP-nya ditentukan harus diinisialisasi dan dikonfigurasi. Untuk informasi lebih lanjut, lihat Panduan Cepat Mulai. Untuk informasi lebih lanjut tentang nama topik dinamis berdasarkan nama database dan pengaturan kunci hash utama, lihat Parameter terkait MQ.
3.2 Ubah File Konfigurasi Canal conf/canal.properties
# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAINAnda harus mengonfigurasi parameter canal.serverMode, kafka.bootstrap.servers, kafka.security.protocol, dan kafka.sasl.mechanism. Anda juga dapat mengubah parameter lainnya sesuai kebutuhan. Parameter kafka.bootstrap.servers menentukan titik akhir Kafka di wilayah tempat topik tujuan berada. Untuk informasi lebih lanjut tentang titik akhir Kafka yang tersedia, lihat Kompatibilitas dengan Kafka.
3.3 Ubah File Konfigurasi JASS conf/kafka_client_producer_jaas.conf
kafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};4. Aktifkan dan nonaktifkan Canal
Sebelum mengaktifkan Canal, pastikan bahwa sebuah topik DataHub telah dibuat. Untuk informasi lebih lanjut tentang persyaratan untuk topik yang dibuat, lihat Kompatibilitas dengan Kafka.
4.1 Aktifkan Canal
cd /usr/local/canal/
sh bin/startup.sh4.2 Lihat Log
Jalankan perintah vi logs/canal/canal.log untuk melihat file canal.log di direktori logs/canal/.
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## mulai server canal.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## mulai server canal[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## server canal sedang berjalan sekarang ......Jalankan perintah vi logs/example/example.log untuk melihat log dari sebuah instance.
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Memuat file properti dari sumber daya class path [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Memuat file properti dari sumber daya class path [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - mulai CannalInstance untuk 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - mulai berhasil....Jalankan perintah vi logs/example/meta.log untuk melihat log metadata.
Rekaman dihasilkan untuk setiap penyisipan, penghapusan, dan modifikasi database di file meta.log. Anda dapat melihat file meta.log untuk memeriksa apakah Canal telah mengumpulkan data.
tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]4.3 Nonaktifkan Canal
cd /usr/local/canal/
sh bin/stop.shContoh
Topik DataHub
Topik DataHub tujuan bertipe TUPLE dan memiliki skema berikut:
+-------+------+----------+-------------+
| Index | nama | tipe | izinkan NULL |
+-------+------+----------+-------------+
| 0 | key | STRING | true |
| 1 | val | STRING | true |
+-------+------+----------+-------------+MySQL
Skema tabel MySQL sumber
mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid | int(11) | YES | | NULL | |
| pid | int(11) | YES | | NULL | |
| num | int(11) | YES | | NULL | |
+-------+---------+------+-----+---------+-------+
3 baris dalam set (0,00 detik)Data
Setelah data ditulis ke DataHub, bidang key bernilai null, dan nilai bidang val adalah string JSON.
mysql> insert into orders values(1,2,3);
{
"data":[
{
"oid":"1",
"pid":"2",
"num":"3"
}
],
"database":"ggtt",
"es":1591092305000,
"id":2,
"isDdl":false,
"mysqlType":{
"oid":"int(11)",
"pid":"int(11)",
"num":"int(11)"
},
"old":null,
"pkNames":null,
"sql":"",
"sqlType":{
"oid":4,
"pid":4,
"num":4
},
"table":"orders",
"ts":1591092305813,
"type":"INSERT"
}