All Products
Search
Document Center

DataHub:Plugin DataHub untuk Canal

Last Updated:Jul 02, 2025

Canal digunakan untuk berlangganan dan mengonsumsi data tambahan dengan cara menguraikan log binari dari database MySQL. Awalnya, Alibaba Group perlu menyinkronkan data antara pusat data di Hangzhou dan Amerika Serikat menggunakan pendekatan berbasis pemicu bisnis. Sejak tahun 2010, metode ini berkembang menjadi penguraian log database untuk meningkatkan efisiensi langganan dan konsumsi data tambahan dalam database. Canal mendukung database sumber yang menggunakan mesin MySQL versi 5.1.x, 5.5.x, 5.6.x, 5.7.x, dan 8.0.x.

Informasi latar belakang

Catatan

Canal digunakan untuk berlangganan dan mengonsumsi data tambahan dengan cara menguraikan log binari dari database MySQL. Awalnya, Alibaba Group perlu menyinkronkan data antara pusat data di Hangzhou dan Amerika Serikat menggunakan pendekatan berbasis pemicu bisnis. Sejak tahun 2010, metode ini berkembang menjadi penguraian log database untuk meningkatkan efisiensi langganan dan konsumsi data tambahan dalam database. Canal mendukung database sumber yang menggunakan mesin MySQL versi 5.1.x, 5.5.x, 5.6.x, 5.7.x, dan 8.0.x.

Canal memungkinkan Anda menulis data ke Kafka, dan DataHub kompatibel dengan protokol Kafka. Oleh karena itu, Anda dapat menggunakan Canal untuk menulis data tambahan dari MySQL ke DataHub. Untuk memastikan bahwa Canal dapat menulis data ke DataHub seperti halnya ke Kafka, beberapa perubahan telah dilakukan pada kerangka kerja Canal open source:

  • Nama Topik Kafka sesuai dengan format NamaProyek.NamaTopik di DataHub. Oleh karena itu, logika yang digunakan untuk mengganti titik (.) dalam Nama Topik Kafka dengan garis bawah (_) dihapus dari kerangka kerja Canal open source. Perubahan ini memastikan bahwa Nama Topik Kafka dapat dipetakan ke topik DataHub yang benar.

  • DataHub menggunakan PLAIN Simple Authentication and Security Layer (SASL) untuk otentikasi. Oleh karena itu, variabel lingkungan -Djava.security.auth.login.config=$kafka_jaas_conf ditambahkan ke skrip startup.

Petunjuk

Topik ini memberikan contoh dasar tentang cara menggunakan Canal untuk menulis data ke DataHub seperti Kafka. Untuk informasi lebih lanjut tentang parameter dan deskripsi parameter, lihat canal.

1. Unduh paket canal.deployer

Unduh paket canal.deployer-1.1.5-SNAPSHOT.tar.gz. Versi Canal yang belum dimodifikasi untuk DataHub mungkin tidak dapat menulis data ke DataHub.

2. Salin paket canal.deployer ke direktori tetap dan ekstrak paket tersebut

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

3. Ubah parameter

3.1 Ubah File Konfigurasi Instance conf/example/instance.properties

# Ubah informasi database sesuai kebutuhan.
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password: nama pengguna dan kata sandi database.
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# konfigurasi mq
canal.mq.topic=test_project.test_topic
# Tentukan topik dinamis berdasarkan nama database atau nama tabel.
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# konfigurasi partisi hash
#canal.mq.partitionsNum=3
# NamaDatabase.NamaTabel: kunci utama unik. Beberapa tabel dipisahkan dengan koma (,).
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

Database MySQL yang alamat IP-nya ditentukan harus diinisialisasi dan dikonfigurasi. Untuk informasi lebih lanjut, lihat Panduan Cepat Mulai. Untuk informasi lebih lanjut tentang nama topik dinamis berdasarkan nama database dan pengaturan kunci hash utama, lihat Parameter terkait MQ.

3.2 Ubah File Konfigurasi Canal conf/canal.properties

# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN

Anda harus mengonfigurasi parameter canal.serverMode, kafka.bootstrap.servers, kafka.security.protocol, dan kafka.sasl.mechanism. Anda juga dapat mengubah parameter lainnya sesuai kebutuhan. Parameter kafka.bootstrap.servers menentukan titik akhir Kafka di wilayah tempat topik tujuan berada. Untuk informasi lebih lanjut tentang titik akhir Kafka yang tersedia, lihat Kompatibilitas dengan Kafka.

3.3 Ubah File Konfigurasi JASS conf/kafka_client_producer_jaas.conf

kafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

4. Aktifkan dan nonaktifkan Canal

Sebelum mengaktifkan Canal, pastikan bahwa sebuah topik DataHub telah dibuat. Untuk informasi lebih lanjut tentang persyaratan untuk topik yang dibuat, lihat Kompatibilitas dengan Kafka.

4.1 Aktifkan Canal

cd /usr/local/canal/
sh bin/startup.sh

4.2 Lihat Log

Jalankan perintah vi logs/canal/canal.log untuk melihat file canal.log di direktori logs/canal/.

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## mulai server canal.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## mulai server canal[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## server canal sedang berjalan sekarang ......

Jalankan perintah vi logs/example/example.log untuk melihat log dari sebuah instance.

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Memuat file properti dari sumber daya class path [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Memuat file properti dari sumber daya class path [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - mulai CannalInstance untuk 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - mulai berhasil....

Jalankan perintah vi logs/example/meta.log untuk melihat log metadata.

Rekaman dihasilkan untuk setiap penyisipan, penghapusan, dan modifikasi database di file meta.log. Anda dapat melihat file meta.log untuk memeriksa apakah Canal telah mengumpulkan data.

tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]

4.3 Nonaktifkan Canal

cd /usr/local/canal/
sh bin/stop.sh

Contoh

Topik DataHub

Topik DataHub tujuan bertipe TUPLE dan memiliki skema berikut:

+-------+------+----------+-------------+
| Index | nama |   tipe   |  izinkan NULL |
+-------+------+----------+-------------+
|   0   |  key |  STRING  |     true    |
|   1   |  val |  STRING  |     true    |
+-------+------+----------+-------------+

MySQL

Skema tabel MySQL sumber

mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type    | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid   | int(11) | YES  |     | NULL    |       |
| pid   | int(11) | YES  |     | NULL    |       |
| num   | int(11) | YES  |     | NULL    |       |
+-------+---------+------+-----+---------+-------+
3 baris dalam set (0,00 detik)

Data

Setelah data ditulis ke DataHub, bidang key bernilai null, dan nilai bidang val adalah string JSON.

mysql> insert into orders values(1,2,3);

{
    "data":[
        {
            "oid":"1",
            "pid":"2",
            "num":"3"
        }
    ],
    "database":"ggtt",
    "es":1591092305000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "oid":"int(11)",
        "pid":"int(11)",
        "num":"int(11)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "oid":4,
        "pid":4,
        "num":4
    },
    "table":"orders",
    "ts":1591092305813,
    "type":"INSERT"
}