全部产品
Search
文档中心

Elasticsearch:Gunakan Canal untuk menyinkronkan data dari MySQL ke Alibaba Cloud Elasticsearch

更新时间:Jul 06, 2025

Jika Anda ingin menyinkronkan data tambahan dari database MySQL ke kluster Alibaba Cloud Elasticsearch, serta memiliki persyaratan tinggi untuk performa real-time, Anda dapat menggunakan Canal untuk menyinkronkan data tersebut.

Informasi latar belakang

Canal adalah produk open source yang disediakan oleh Alibaba Group. Canal dapat mengurai data log tambahan di MySQL dan memungkinkan Anda untuk berlangganan serta mengonsumsi data tambahan. Untuk informasi lebih lanjut tentang prinsip kerja dan pengenalan Canal, lihat Canal. Dalam contoh ini, Canal digunakan sebagai node sekunder dari instance ApsaraDB RDS for MySQL dan menerima log biner yang dihasilkan untuk data tambahan dari instance tersebut. Kemudian, API RESTful digunakan untuk menulis data ke kluster Alibaba Cloud Elasticsearch. Solusi yang dijelaskan dalam topik ini cocok untuk skenario di mana pengguna memiliki persyaratan tinggi untuk performa real-time sinkronisasi data.

Prasyarat

Instance ApsaraDB RDS for MySQL, kluster Alibaba Cloud Elasticsearch, dan instance Elastic Compute Service (ECS) telah dibuat. Kami merekomendasikan agar Anda membuatnya di virtual private cloud (VPC) yang sama.

Batasan

  • Solusi yang dijelaskan dalam topik ini hanya dapat digunakan untuk menyinkronkan data tambahan MySQL ke kluster Alibaba Cloud Elasticsearch.

  • Versi Java Development Kit (JDK) yang Anda instal harus versi 1.8.0 atau yang lebih baru.

  • Canal 1.1.4 tidak dapat digunakan untuk menyinkronkan data ke kluster Elasticsearch V7.X.

    Anda perlu menggunakan Canal 1.1.5 untuk menyinkronkan data ke kluster Elasticsearch V7.X serta menggunakan Canal 1.1.7 untuk menyinkronkan data ke kluster Elasticsearch V8.X. Anda juga dapat menggunakan metode lain seperti Logstash atau Data Transmission Service (DTS) untuk menyinkronkan data MySQL.

  • Saat melakukan konfigurasi untuk menyinkronkan data, Anda dapat menyesuaikan pemetaan untuk indeks yang diperlukan. Pastikan bahwa nama dan tipe data bidang yang didefinisikan dalam pemetaan untuk indeks sama dengan nama dan tipe data bidang di database ApsaraDB RDS for MySQL yang diinginkan.

  • Jika Anda menggunakan solusi yang dijelaskan dalam topik ini untuk menyinkronkan data, pastikan bahwa Canal tersedia. Jika tidak, kesalahan mungkin terjadi, atau layanan mungkin terganggu. Misalnya, pastikan bahwa sinkronisasi data tidak terganggu dalam skenario seperti restart instance ECS dan keluarnya Canal karena pengecualian.

  • Adapter Canal tidak dapat terhubung ke kluster Alibaba Cloud Elasticsearch melalui HTTPS.

Prosedur

Langkah 1: Siapkan sumber data MySQL

Masuk ke Konsol ApsaraDB RDS, dan buat database ApsaraDB RDS for MySQL serta tabel. Untuk informasi lebih lanjut, lihat Alur Kerja Umum untuk Menggunakan ApsaraDB RDS for MySQL. Dalam contoh ini, pernyataan berikut digunakan untuk membuat tabel:

-- buat tabel
CREATE TABLE `es_test` (
    `id` bigint(32) NOT NULL,
    `name` text NOT NULL,
    `count` text NOT NULL,
    `color` text NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;

Langkah 2: Buat indeks dan konfigurasikan pemetaan untuk indeks

  1. Masuk ke konsol Kibana kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Masuk ke Konsol Kibana.

    Catatan

    Dalam contoh ini, kluster Elasticsearch V6.7.0 digunakan. Operasi pada kluster versi lain mungkin berbeda.

  2. Di panel navigasi kiri halaman yang muncul, klik Dev Tools.

  3. Di tab Console, jalankan perintah berikut untuk membuat indeks.

    Dalam contoh ini, indeks bernama es_test dibuat. Indeks tersebut berisi bidang berikut: count, id, name, dan color.

    Penting

    Pastikan bahwa nama dan tipe data bidang yang didefinisikan dalam pemetaan untuk indeks sama dengan nama dan tipe data bidang yang dipersiapkan di Langkah 1: Siapkan Sumber Data MySQL.

    PUT es_test?include_type_name=true
    {
    
        "settings" : {
          "index" : {
            "number_of_shards" : "5",
            "number_of_replicas" : "1"
          }
        },
        "mappings" : {
            "_doc" : {
                "properties" : {
                  "count": {          
                       "type": "text"       
                   },
                  "id": {
                       "type": "integer"
                   },
                   "name": {
                        "type" : "text",
                        "analyzer": "ik_smart"                   
                    },
                    "color" : {
                        "type" : "text"                    
                    }
                }
            }
        }
    }

    Setelah indeks dibuat dan pemetaan dikonfigurasi, hasil berikut dikembalikan:

    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "index" : "es_test"
    }

Langkah 3: Instal JDK

  1. Hubungkan ke instance ECS.

    Untuk informasi lebih lanjut, lihat Hubungkan ke Instance Linux Menggunakan Kata Sandi atau Kunci.

    Catatan

    Dalam contoh ini, pengguna biasa digunakan.

  2. Lihat paket JDK yang tersedia.

    sudo yum search java | grep -i --color JDK
  3. Instal JDK versi yang diperlukan.

    Dalam contoh ini, java-1.8.0-openjdk-devel.x86_64 digunakan.

    sudo yum install java-1.8.0-openjdk-devel.x86_64
  4. Konfigurasikan variabel lingkungan.

    1. Buka file profil di folder etc.

      vim ~/.bash_profile
    2. Tambahkan variabel lingkungan berikut ke file:

      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64
      export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
      export PATH=$PATH:$JAVA_HOME/bin
      Penting

      Saat menggunakan variabel lingkungan, Anda harus mengganti JAVA_HOME dengan jalur instalasi JDK Anda. Anda dapat menjalankan perintah find / -name 'java' untuk menanyakan jalur instalasi JDK Anda.

    3. Tekan Esc, dan jalankan perintah :wq untuk menyimpan file dan keluar dari mode vi. Lalu, jalankan perintah berikut untuk menerapkan konfigurasi:

      source ~/.bash_profile
  5. Jalankan perintah berikut untuk memeriksa apakah JDK telah diinstal:

    java -version

    Jika hasil berikut dikembalikan, JDK telah diinstal:

    openjdk version "1.8.0_362"
    OpenJDK Runtime Environment (build 1.8.0_362-b08)
    OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)

Langkah 4: Instal dan mulai server Canal

  1. Unduh paket server Canal.

    Dalam contoh ini, server Canal 1.1.4 digunakan.

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    Catatan
    • Canal 1.1.5 mendukung kluster Elasticsearch V7.0. Jika Anda menggunakan kluster Elasticsearch V7.0, Anda perlu mengunduh paket Canal 1.1.5. Untuk informasi lebih lanjut, lihat Catatan Rilis Canal.

    • Anda harus mengunduh paket server Canal dan adapter Canal melalui Internet. Pastikan instance ECS Anda dapat mengakses Internet.

  2. Jalankan perintah berikut untuk mengekstrak paket:

    tar -zxvf canal.deployer-1.1.4.tar.gz
  3. Jalankan perintah berikut untuk memodifikasi file instance.properties di direktori conf/example/:

    vi conf/example/instance.properties

    修改conf/example/instance.properties文件

    Parameter

    Deskripsi

    canal.instance.master.address

    Anda harus menyetel parameter ini ke nilai dalam format <Titik akhir internal instance ApsaraDB RDS for MySQL>:<Port internal>. Anda dapat memperoleh informasi yang diperlukan di halaman Basic Information instance ApsaraDB RDS for MySQL. Contoh: rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306.

    canal.instance.dbUsername

    Nama pengguna yang digunakan untuk masuk ke database ApsaraDB RDS for MySQL. Anda dapat memperoleh nama pengguna di halaman Accounts instance ApsaraDB RDS for MySQL.

    canal.instance.dbPassword

    Kata sandi yang digunakan untuk masuk ke database ApsaraDB RDS for MySQL.

  4. Tekan Esc, dan jalankan perintah :wq untuk menyimpan file dan keluar dari mode vi.

  5. Mulai server Canal dan kueri log.

    ./bin/startup.sh
    cat logs/canal/canal.log

    启动canal-server

Langkah 5: Instal dan mulai adapter Canal

  1. Unduh paket adapter Canal.

    Dalam contoh ini, adapter Canal 1.1.4 digunakan.

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
    Catatan
    • Canal 1.1.5 mendukung kluster Elasticsearch V7.0. Jika Anda menggunakan kluster Elasticsearch V7.0, Anda perlu mengunduh paket Canal 1.1.5. Untuk informasi lebih lanjut, lihat Catatan Rilis Canal.

    • Anda harus mengunduh paket server Canal dan adapter Canal melalui Internet. Pastikan instance ECS Anda dapat mengakses Internet.

  2. Jalankan perintah berikut untuk mengekstrak paket:

    tar -zxvf canal.adapter-1.1.4.tar.gz
  3. Jalankan perintah berikut untuk memodifikasi file application.yml di direktori conf/:

    vi conf/application.yml

    image..png

    Parameter

    Deskripsi

    canal.conf.canalServerHost

    Alamat deployer Canal. Pertahankan nilai default 127.0.0.1:11111.

    canal.conf.srcDataSources.defaultDS.url

    Anda harus menyetel parameter ini ke nilai dalam format jdbc:mysql://<Titik akhir internal instance ApsaraDB RDS for MySQL>:<Port internal>/<Nama database>?useUnicode=true. Anda dapat memperoleh informasi yang diperlukan di halaman Basic Information instance ApsaraDB RDS for MySQL. Contoh: jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true.

    canal.conf.srcDataSources.defaultDS.username

    Nama pengguna yang digunakan untuk masuk ke database ApsaraDB RDS for MySQL. Anda dapat memperoleh nama pengguna di halaman Accounts instance ApsaraDB RDS for MySQL.

    canal.conf.srcDataSources.defaultDS.password

    Kata sandi yang digunakan untuk masuk ke database ApsaraDB RDS for MySQL.

    canal.conf.canalAdapters.groups.outerAdapters.hosts

    Temukan name:es dan set hosts ke nilai dalam format <Titik akhir internal kluster Elasticsearch>:<Port internal>. Anda dapat memperoleh informasi yang diperlukan di halaman Informasi Dasar kluster Elasticsearch. Contoh: es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200.

    canal.conf.canalAdapters.groups.outerAdapters.mode

    Setel parameter ini ke rest.

    canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth

    Anda harus menyetel parameter ini ke nilai dalam format <Nama pengguna kluster Elasticsearch>:<Kata sandi yang sesuai dengan nama pengguna>. Contoh: elastic:es_password.

    canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name

    ID kluster Elasticsearch. Anda dapat memperoleh ID di halaman Informasi Dasar kluster Elasticsearch. Contoh: es-cn-v64xxxxxxxxx3medp.

  4. Tekan Esc, dan jalankan perintah :wq untuk menyimpan file dan keluar dari mode vi.

  5. Ulangi langkah-langkah sebelumnya untuk memodifikasi file *.yml di direktori conf/es/ dan tentukan bidang yang ingin Anda petakan dari database ApsaraDB RDS for MySQL ke kluster Elasticsearch.

    修改conf/es/*.yml文件

    Parameter

    Deskripsi

    esMapping._index

    Setel nilai ke nama indeks yang dibuat di kluster Elasticsearch di Langkah 2: Buat indeks dan konfigurasikan pemetaan untuk indeks. Dalam contoh ini, es_test digunakan.

    esMapping._type

    Setel nilai ke tipe indeks yang dibuat di kluster Elasticsearch di Langkah 2: Buat indeks dan konfigurasikan pemetaan untuk indeks. Dalam contoh ini, _doc digunakan.

    esMapping._id

    ID dokumen yang dihasilkan untuk bidang yang ingin Anda sinkronkan ke kluster Elasticsearch. Anda dapat menentukan ID kustom. Dalam contoh ini, _id digunakan.

    esMapping.sql

    Pernyataan SQL yang digunakan untuk menanyakan bidang yang ingin Anda sinkronkan ke kluster Elasticsearch. Dalam contoh ini, pernyataan select t.id as _id,t.id,t.count,t.name,t.color from es_test t digunakan.

  6. Mulai adapter Canal dan kueri log.

    ./bin/startup.sh
    cat logs/adapter/adapter.log
    Catatan

    Dalam contoh ini, instance ApsaraDB RDS for MySQL yang menjalankan MySQL 5.7 digunakan. Jika Anda menggunakan instance ApsaraDB RDS for MySQL yang menjalankan versi MySQL lainnya, Anda harus memastikan bahwa versi driver MySQL di adapter Canal sesuai dengan versi MySQL dari instance ApsaraDB RDS for MySQL yang ingin Anda sambungkan. Jika tidak, adapter Canal gagal dimulai. Untuk informasi lebih lanjut, lihat bagian FAQ dalam topik ini.

    Jika hasil seperti pada gambar berikut dikembalikan, adapter Canal telah dimulai.Canal-adapter服务日志

Langkah 6: Verifikasi hasil sinkronisasi data tambahan

  1. Di database ApsaraDB RDS for MySQL, tambahkan data ke, modifikasi data di, atau hapus data dari tabel es_test.

    insert `ES`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
  2. Masuk ke konsol Kibana kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Masuk ke Konsol Kibana.

  3. Di panel navigasi kiri halaman yang muncul, klik Dev Tools.

  4. Di tab Console, jalankan perintah berikut untuk menanyakan data yang disinkronkan:

    GET /es_test/_search

    Jika sinkronisasi data berhasil, hasil seperti pada gambar berikut dikembalikan.数据同步成功结果

    Penting

    Canal hanya menyinkronkan data tambahan.

FAQ

T: Apa yang harus saya lakukan jika pesan kesalahan berikut dikembalikan dalam log yang dihasilkan untuk adapter Canal saat saya memulai adapter: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource?

at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]

A: Ganti client-adapter.es7x-1.1.5-jar-with-dependencies.jar di direktori canal.adapter-1.1.5\plugin dengan file terkait untuk canal-1.1.5-alpha-2.

Catatan

Untuk informasi lebih lanjut, Masalah Canal.

Dalam contoh ini, pengguna root digunakan.

  1. Unduh paket canal-1.1.5-alpha-2. Untuk informasi lebih lanjut, lihat Catatan Rilis Canal.

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
  2. Ekstrak paket.

    tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz
  3. Salin client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar di folder plugin untuk canal-1.1.5-alpha-2 ke direktori canal.adapter-1.1.5\plugin.

    Catatan

    Direktori file yang perlu Anda salin mungkin berbeda. Direktori aktual yang berlaku.

    cp canal.adapter-1.1.5-SNAPSHOT/plugin/client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar canal/canal.adapter/plugin
  4. Hapus client-adapter.es7x-1.1.5-jar-with-dependencies.jar dari direktori canal.adapter-1.1.5\plugin.

    rm -rf client-adapter.es7x-1.1.5-jar-with-dependencies.jar
  5. Ubah nama file.

    mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar 

T: Apa yang harus saya lakukan jika pesan kesalahan berikut dikembalikan dalam log yang dihasilkan untuk adapter Canal saat saya memulai adapter: java.sql.SQLException: Unknown system variable 'query_cache_size'?

A: Alasan yang mungkin adalah bahwa versi driver MySQL di adapter Canal tidak sesuai dengan versi MySQL dari database ApsaraDB RDS for MySQL yang ingin Anda sambungkan. Sebagai contoh, versi driver MySQL di adapter Canal 1.1.4 adalah 5.1.40. Jika Anda menggunakan Canal yang menggunakan adapter Canal versi ini untuk menyambungkan ke database ApsaraDB RDS for MySQL yang menjalankan MySQL 8.x, pesan kesalahan tersebut akan dikembalikan. Ubah driver MySQL di adapter Canal dan pastikan bahwa versi driver MySQL sesuai dengan versi MySQL dari database ApsaraDB RDS for MySQL.

T: Saya ingin menyinkronkan data dari instance ApsaraDB RDS for MySQL yang menjalankan MySQL 8.0 ke kluster Elasticsearch. Bagaimana cara mengubah driver MySQL yang ada menjadi driver MySQL 8.0?

A: Dalam contoh ini, pengguna root digunakan.

  1. Unduh paket driver MySQL 8.0.

    wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zip
  2. Ekstrak paket:

    unzip mysql-connector-java-8.0.29.zip
  3. Salin file yang diperoleh ke direktori lib dari adapter Canal.

    mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar lib/
  4. Tambahkan izin.

    chmod 777 lib/mysql-connector-java-8.0.29.jar
    chmod +st lib/mysql-connector-java-8.0.29.jar
  5. Hapus driver MySQL 5.x.

    rm -rf lib/mysql-connector-java-5.1.40.jar

Referensi