Jika Anda ingin menyinkronkan data tambahan dari database MySQL ke kluster Alibaba Cloud Elasticsearch, serta memiliki persyaratan tinggi untuk performa real-time, Anda dapat menggunakan Canal untuk menyinkronkan data tersebut.
Informasi latar belakang
Canal adalah produk open source yang disediakan oleh Alibaba Group. Canal dapat mengurai data log tambahan di MySQL dan memungkinkan Anda untuk berlangganan serta mengonsumsi data tambahan. Untuk informasi lebih lanjut tentang prinsip kerja dan pengenalan Canal, lihat Canal. Dalam contoh ini, Canal digunakan sebagai node sekunder dari instance ApsaraDB RDS for MySQL dan menerima log biner yang dihasilkan untuk data tambahan dari instance tersebut. Kemudian, API RESTful digunakan untuk menulis data ke kluster Alibaba Cloud Elasticsearch. Solusi yang dijelaskan dalam topik ini cocok untuk skenario di mana pengguna memiliki persyaratan tinggi untuk performa real-time sinkronisasi data.
Prasyarat
Instance ApsaraDB RDS for MySQL, kluster Alibaba Cloud Elasticsearch, dan instance Elastic Compute Service (ECS) telah dibuat. Kami merekomendasikan agar Anda membuatnya di virtual private cloud (VPC) yang sama.
Untuk informasi tentang cara membuat instance ApsaraDB RDS for MySQL, lihat Buat Instance ApsaraDB RDS for MySQL. Dalam contoh ini, instance ApsaraDB RDS yang menjalankan MySQL 5.7 dibuat.
Untuk informasi tentang cara membuat kluster Alibaba Cloud Elasticsearch, lihat Buat Kluster Alibaba Cloud Elasticsearch. Dalam contoh ini, kluster Elasticsearch V6.7 Edisi Kernel-ditingkatkan dibuat.
CatatanSebelum menggunakan Canal untuk menulis data ke kluster Elasticsearch, Anda harus menambahkan alamat IP instance ECS tempat Canal diterapkan ke daftar putih alamat IP kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP Publik atau Privat untuk Kluster Elasticsearch.
Instance ECS digunakan untuk menerapkan server Canal dan adapter Canal. Untuk informasi tentang cara membuat instance ECS, lihat Buat Instance Menggunakan Wizard. Dalam contoh ini, instance ECS yang menjalankan image CentOS 7.6 64-bit dibuat.
Batasan
Solusi yang dijelaskan dalam topik ini hanya dapat digunakan untuk menyinkronkan data tambahan MySQL ke kluster Alibaba Cloud Elasticsearch.
Versi Java Development Kit (JDK) yang Anda instal harus versi 1.8.0 atau yang lebih baru.
Canal 1.1.4 tidak dapat digunakan untuk menyinkronkan data ke kluster Elasticsearch V7.X.
Anda perlu menggunakan Canal 1.1.5 untuk menyinkronkan data ke kluster Elasticsearch V7.X serta menggunakan Canal 1.1.7 untuk menyinkronkan data ke kluster Elasticsearch V8.X. Anda juga dapat menggunakan metode lain seperti Logstash atau Data Transmission Service (DTS) untuk menyinkronkan data MySQL.
Saat melakukan konfigurasi untuk menyinkronkan data, Anda dapat menyesuaikan pemetaan untuk indeks yang diperlukan. Pastikan bahwa nama dan tipe data bidang yang didefinisikan dalam pemetaan untuk indeks sama dengan nama dan tipe data bidang di database ApsaraDB RDS for MySQL yang diinginkan.
Jika Anda menggunakan solusi yang dijelaskan dalam topik ini untuk menyinkronkan data, pastikan bahwa Canal tersedia. Jika tidak, kesalahan mungkin terjadi, atau layanan mungkin terganggu. Misalnya, pastikan bahwa sinkronisasi data tidak terganggu dalam skenario seperti restart instance ECS dan keluarnya Canal karena pengecualian.
Adapter Canal tidak dapat terhubung ke kluster Alibaba Cloud Elasticsearch melalui HTTPS.
Prosedur
Langkah 1: Siapkan sumber data MySQL
Masuk ke Konsol ApsaraDB RDS, dan buat database ApsaraDB RDS for MySQL serta tabel. Untuk informasi lebih lanjut, lihat Alur Kerja Umum untuk Menggunakan ApsaraDB RDS for MySQL. Dalam contoh ini, pernyataan berikut digunakan untuk membuat tabel:
-- buat tabel
CREATE TABLE `es_test` (
`id` bigint(32) NOT NULL,
`name` text NOT NULL,
`count` text NOT NULL,
`color` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;Langkah 2: Buat indeks dan konfigurasikan pemetaan untuk indeks
Masuk ke konsol Kibana kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Masuk ke Konsol Kibana.
CatatanDalam contoh ini, kluster Elasticsearch V6.7.0 digunakan. Operasi pada kluster versi lain mungkin berbeda.
Di panel navigasi kiri halaman yang muncul, klik Dev Tools.
Di tab Console, jalankan perintah berikut untuk membuat indeks.
Dalam contoh ini, indeks bernama es_test dibuat. Indeks tersebut berisi bidang berikut: count, id, name, dan color.
PentingPastikan bahwa nama dan tipe data bidang yang didefinisikan dalam pemetaan untuk indeks sama dengan nama dan tipe data bidang yang dipersiapkan di Langkah 1: Siapkan Sumber Data MySQL.
PUT es_test?include_type_name=true { "settings" : { "index" : { "number_of_shards" : "5", "number_of_replicas" : "1" } }, "mappings" : { "_doc" : { "properties" : { "count": { "type": "text" }, "id": { "type": "integer" }, "name": { "type" : "text", "analyzer": "ik_smart" }, "color" : { "type" : "text" } } } } }Setelah indeks dibuat dan pemetaan dikonfigurasi, hasil berikut dikembalikan:
{ "acknowledged" : true, "shards_acknowledged" : true, "index" : "es_test" }
Langkah 3: Instal JDK
Hubungkan ke instance ECS.
Untuk informasi lebih lanjut, lihat Hubungkan ke Instance Linux Menggunakan Kata Sandi atau Kunci.
CatatanDalam contoh ini, pengguna biasa digunakan.
Lihat paket JDK yang tersedia.
sudo yum search java | grep -i --color JDKInstal JDK versi yang diperlukan.
Dalam contoh ini, java-1.8.0-openjdk-devel.x86_64 digunakan.
sudo yum install java-1.8.0-openjdk-devel.x86_64Konfigurasikan variabel lingkungan.
Buka file profil di folder etc.
vim ~/.bash_profileTambahkan variabel lingkungan berikut ke file:
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/binPentingSaat menggunakan variabel lingkungan, Anda harus mengganti JAVA_HOME dengan jalur instalasi JDK Anda. Anda dapat menjalankan perintah
find / -name 'java'untuk menanyakan jalur instalasi JDK Anda.Tekan Esc, dan jalankan perintah
:wquntuk menyimpan file dan keluar dari mode vi. Lalu, jalankan perintah berikut untuk menerapkan konfigurasi:source ~/.bash_profile
Jalankan perintah berikut untuk memeriksa apakah JDK telah diinstal:
java -versionJika hasil berikut dikembalikan, JDK telah diinstal:
openjdk version "1.8.0_362" OpenJDK Runtime Environment (build 1.8.0_362-b08) OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
Langkah 4: Instal dan mulai server Canal
Unduh paket server Canal.
Dalam contoh ini, server Canal 1.1.4 digunakan.
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gzCatatanCanal 1.1.5 mendukung kluster Elasticsearch V7.0. Jika Anda menggunakan kluster Elasticsearch V7.0, Anda perlu mengunduh paket Canal 1.1.5. Untuk informasi lebih lanjut, lihat Catatan Rilis Canal.
Anda harus mengunduh paket server Canal dan adapter Canal melalui Internet. Pastikan instance ECS Anda dapat mengakses Internet.
Jalankan perintah berikut untuk mengekstrak paket:
tar -zxvf canal.deployer-1.1.4.tar.gzJalankan perintah berikut untuk memodifikasi file
instance.propertiesdi direktori conf/example/:vi conf/example/instance.properties
Parameter
Deskripsi
canal.instance.master.address
Anda harus menyetel parameter ini ke nilai dalam format <Titik akhir internal instance ApsaraDB RDS for MySQL>:<Port internal>. Anda dapat memperoleh informasi yang diperlukan di halaman Basic Information instance ApsaraDB RDS for MySQL. Contoh: rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306.
canal.instance.dbUsername
Nama pengguna yang digunakan untuk masuk ke database ApsaraDB RDS for MySQL. Anda dapat memperoleh nama pengguna di halaman Accounts instance ApsaraDB RDS for MySQL.
canal.instance.dbPassword
Kata sandi yang digunakan untuk masuk ke database ApsaraDB RDS for MySQL.
Tekan Esc, dan jalankan perintah
:wquntuk menyimpan file dan keluar dari mode vi.Mulai server Canal dan kueri log.
./bin/startup.sh cat logs/canal/canal.log
Langkah 5: Instal dan mulai adapter Canal
Unduh paket adapter Canal.
Dalam contoh ini, adapter Canal 1.1.4 digunakan.
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gzCatatanCanal 1.1.5 mendukung kluster Elasticsearch V7.0. Jika Anda menggunakan kluster Elasticsearch V7.0, Anda perlu mengunduh paket Canal 1.1.5. Untuk informasi lebih lanjut, lihat Catatan Rilis Canal.
Anda harus mengunduh paket server Canal dan adapter Canal melalui Internet. Pastikan instance ECS Anda dapat mengakses Internet.
Jalankan perintah berikut untuk mengekstrak paket:
tar -zxvf canal.adapter-1.1.4.tar.gzJalankan perintah berikut untuk memodifikasi file
application.ymldi direktori conf/:vi conf/application.yml
Parameter
Deskripsi
canal.conf.canalServerHost
Alamat deployer Canal. Pertahankan nilai default 127.0.0.1:11111.
canal.conf.srcDataSources.defaultDS.url
Anda harus menyetel parameter ini ke nilai dalam format jdbc:mysql://<Titik akhir internal instance ApsaraDB RDS for MySQL>:<Port internal>/<Nama database>?useUnicode=true. Anda dapat memperoleh informasi yang diperlukan di halaman Basic Information instance ApsaraDB RDS for MySQL. Contoh: jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true.
canal.conf.srcDataSources.defaultDS.username
Nama pengguna yang digunakan untuk masuk ke database ApsaraDB RDS for MySQL. Anda dapat memperoleh nama pengguna di halaman Accounts instance ApsaraDB RDS for MySQL.
canal.conf.srcDataSources.defaultDS.password
Kata sandi yang digunakan untuk masuk ke database ApsaraDB RDS for MySQL.
canal.conf.canalAdapters.groups.outerAdapters.hosts
Temukan name:es dan set hosts ke nilai dalam format <Titik akhir internal kluster Elasticsearch>:<Port internal>. Anda dapat memperoleh informasi yang diperlukan di halaman Informasi Dasar kluster Elasticsearch. Contoh: es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200.
canal.conf.canalAdapters.groups.outerAdapters.mode
Setel parameter ini ke rest.
canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth
Anda harus menyetel parameter ini ke nilai dalam format <Nama pengguna kluster Elasticsearch>:<Kata sandi yang sesuai dengan nama pengguna>. Contoh: elastic:es_password.
canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name
ID kluster Elasticsearch. Anda dapat memperoleh ID di halaman Informasi Dasar kluster Elasticsearch. Contoh: es-cn-v64xxxxxxxxx3medp.
Tekan Esc, dan jalankan perintah
:wquntuk menyimpan file dan keluar dari mode vi.Ulangi langkah-langkah sebelumnya untuk memodifikasi file
*.ymldi direktori conf/es/ dan tentukan bidang yang ingin Anda petakan dari database ApsaraDB RDS for MySQL ke kluster Elasticsearch.
Parameter
Deskripsi
esMapping._index
Setel nilai ke nama indeks yang dibuat di kluster Elasticsearch di Langkah 2: Buat indeks dan konfigurasikan pemetaan untuk indeks. Dalam contoh ini, es_test digunakan.
esMapping._type
Setel nilai ke tipe indeks yang dibuat di kluster Elasticsearch di Langkah 2: Buat indeks dan konfigurasikan pemetaan untuk indeks. Dalam contoh ini, _doc digunakan.
esMapping._id
ID dokumen yang dihasilkan untuk bidang yang ingin Anda sinkronkan ke kluster Elasticsearch. Anda dapat menentukan ID kustom. Dalam contoh ini, _id digunakan.
esMapping.sql
Pernyataan SQL yang digunakan untuk menanyakan bidang yang ingin Anda sinkronkan ke kluster Elasticsearch. Dalam contoh ini, pernyataan
select t.id as _id,t.id,t.count,t.name,t.color from es_test tdigunakan.Mulai adapter Canal dan kueri log.
./bin/startup.sh cat logs/adapter/adapter.logCatatanDalam contoh ini, instance ApsaraDB RDS for MySQL yang menjalankan MySQL 5.7 digunakan. Jika Anda menggunakan instance ApsaraDB RDS for MySQL yang menjalankan versi MySQL lainnya, Anda harus memastikan bahwa versi driver MySQL di adapter Canal sesuai dengan versi MySQL dari instance ApsaraDB RDS for MySQL yang ingin Anda sambungkan. Jika tidak, adapter Canal gagal dimulai. Untuk informasi lebih lanjut, lihat bagian FAQ dalam topik ini.
Jika hasil seperti pada gambar berikut dikembalikan, adapter Canal telah dimulai.

Langkah 6: Verifikasi hasil sinkronisasi data tambahan
Di database ApsaraDB RDS for MySQL, tambahkan data ke, modifikasi data di, atau hapus data dari tabel es_test.
insert `ES`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');Masuk ke konsol Kibana kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Masuk ke Konsol Kibana.
Di panel navigasi kiri halaman yang muncul, klik Dev Tools.
Di tab Console, jalankan perintah berikut untuk menanyakan data yang disinkronkan:
GET /es_test/_searchJika sinkronisasi data berhasil, hasil seperti pada gambar berikut dikembalikan.
PentingCanal hanya menyinkronkan data tambahan.
FAQ
T: Apa yang harus saya lakukan jika pesan kesalahan berikut dikembalikan dalam log yang dihasilkan untuk adapter Canal saat saya memulai adapter: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource?
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
A: Ganti client-adapter.es7x-1.1.5-jar-with-dependencies.jar di direktori canal.adapter-1.1.5\plugin dengan file terkait untuk canal-1.1.5-alpha-2.
Untuk informasi lebih lanjut, Masalah Canal.
Dalam contoh ini, pengguna root digunakan.
Unduh paket canal-1.1.5-alpha-2. Untuk informasi lebih lanjut, lihat Catatan Rilis Canal.
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gzEkstrak paket.
tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gzSalin client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar di folder plugin untuk canal-1.1.5-alpha-2 ke direktori canal.adapter-1.1.5\plugin.
CatatanDirektori file yang perlu Anda salin mungkin berbeda. Direktori aktual yang berlaku.
cp canal.adapter-1.1.5-SNAPSHOT/plugin/client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar canal/canal.adapter/pluginHapus client-adapter.es7x-1.1.5-jar-with-dependencies.jar dari direktori canal.adapter-1.1.5\plugin.
rm -rf client-adapter.es7x-1.1.5-jar-with-dependencies.jarUbah nama file.
mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar
T: Apa yang harus saya lakukan jika pesan kesalahan berikut dikembalikan dalam log yang dihasilkan untuk adapter Canal saat saya memulai adapter: java.sql.SQLException: Unknown system variable 'query_cache_size'?
A: Alasan yang mungkin adalah bahwa versi driver MySQL di adapter Canal tidak sesuai dengan versi MySQL dari database ApsaraDB RDS for MySQL yang ingin Anda sambungkan. Sebagai contoh, versi driver MySQL di adapter Canal 1.1.4 adalah 5.1.40. Jika Anda menggunakan Canal yang menggunakan adapter Canal versi ini untuk menyambungkan ke database ApsaraDB RDS for MySQL yang menjalankan MySQL 8.x, pesan kesalahan tersebut akan dikembalikan. Ubah driver MySQL di adapter Canal dan pastikan bahwa versi driver MySQL sesuai dengan versi MySQL dari database ApsaraDB RDS for MySQL.
T: Saya ingin menyinkronkan data dari instance ApsaraDB RDS for MySQL yang menjalankan MySQL 8.0 ke kluster Elasticsearch. Bagaimana cara mengubah driver MySQL yang ada menjadi driver MySQL 8.0?
A: Dalam contoh ini, pengguna root digunakan.
Unduh paket driver MySQL 8.0.
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zipEkstrak paket:
unzip mysql-connector-java-8.0.29.zipSalin file yang diperoleh ke direktori lib dari adapter Canal.
mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar lib/Tambahkan izin.
chmod 777 lib/mysql-connector-java-8.0.29.jar chmod +st lib/mysql-connector-java-8.0.29.jarHapus driver MySQL 5.x.
rm -rf lib/mysql-connector-java-5.1.40.jar