全部产品
Search
文档中心

Elasticsearch:Gunakan DTS untuk menyinkronkan data dari MySQL ke Alibaba Cloud Elasticsearch secara real-time

更新时间:Jul 06, 2025

Jika Anda ingin mencari dan menganalisis data produksi di database ApsaraDB RDS for MySQL menggunakan Alibaba Cloud Elasticsearch, Anda dapat menggunakan Data Transmission Service (DTS) untuk menyinkronkan data ke kluster Elasticsearch secara real-time. Sinkronisasi ini diimplementasikan melalui tugas sinkronisasi real-time dan cocok untuk skenario yang memerlukan kinerja sinkronisasi tinggi. Topik ini menjelaskan cara membuat tugas sinkronisasi real-time untuk menyinkronkan data dari database ApsaraDB RDS for MySQL ke kluster Alibaba Cloud Elasticsearch, serta cara memverifikasi hasil sinkronisasi data penuh dan tambahan.

Informasi latar belakang

  • DTS adalah layanan transmisi data yang mengintegrasikan migrasi data, langganan data, dan sinkronisasi data real-time. Untuk informasi lebih lanjut, lihat DTS. DTS mendukung sinkronisasi perubahan data yang dihasilkan oleh operasi insert, delete, dan update. Untuk informasi tentang versi sumber data yang didukung, lihat Ikhtisar Skenario Sinkronisasi Data.

  • Anda dapat menggunakan DTS untuk menyinkronkan data penuh atau tambahan dari MySQL ke Elasticsearch. Solusi ini ideal untuk skenario yang memerlukan kinerja sinkronisasi real-time tinggi dari database relasional atau untuk menyinkronkan data penuh atau tambahan dari database relasional ke kluster Alibaba Cloud Elasticsearch.

Peringatan

  • DTS tidak menyinkronkan perubahan data yang dihasilkan oleh operasi DDL. Jika operasi DDL dilakukan pada tabel dalam database sumber selama sinkronisasi data, lakukan langkah-langkah berikut: Hapus tabel dari tugas sinkronisasi data, hapus indeks untuk tabel dari kluster Elasticsearch, lalu tambahkan tabel kembali ke tugas sinkronisasi data. Untuk informasi lebih lanjut, lihat Hapus objek dari tugas sinkronisasi data dan Tambahkan objek ke tugas sinkronisasi data.

  • Jika Anda ingin menambahkan kolom ke tabel sumber, modifikasi pemetaan indeks yang sesuai dengan tabel tersebut. Kemudian, jalankan operasi DDL terkait pada tabel sumber, jeda tugas sinkronisasi, dan mulai ulang tugas tersebut.

  • DTS menggunakan sumber daya baca dan tulis dari sumber dan tujuan selama sinkronisasi data penuh awal. Hal ini dapat meningkatkan beban sumber dan tujuan. Jika performa sumber atau tujuan rendah, spesifikasinya rendah, atau volume datanya besar, sumber atau tujuan mungkin menjadi tidak tersedia. Misalnya, DTS menggunakan banyak sumber daya baca dan tulis jika ada sejumlah besar kueri SQL lambat pada sumber, satu atau beberapa tabel tidak memiliki kunci utama, atau deadlock terjadi pada tujuan. Untuk mencegah masalah ini, evaluasi dampak sinkronisasi data pada performa sumber dan tujuan sebelum memulai sinkronisasi. Kami sarankan menyinkronkan data selama jam non-puncak, seperti ketika utilisasi CPU sumber dan tujuan kurang dari 30%.

    • Jika Anda menyinkronkan data penuh selama jam puncak, sinkronisasi mungkin gagal. Dalam hal ini, mulai ulang tugas sinkronisasi.

    • Jika Anda menyinkronkan data tambahan selama jam puncak, latensi sinkronisasi data mungkin terjadi.

  • ApsaraDB RDS for MySQL dan Elasticsearch mendukung tipe data yang berbeda. Selama sinkronisasi skema awal, DTS membuat pemetaan antara bidang sumber dan tujuan berdasarkan tipe data yang didukung oleh tujuan. Untuk informasi lebih lanjut, lihat Pemetaan Tipe Data untuk Sinkronisasi Skema.

Proses

  1. Buat Persiapan: Tambahkan data yang akan disinkronkan ke database sumber ApsaraDB RDS for MySQL, buat kluster Alibaba Cloud Elasticsearch, dan aktifkan fitur Auto Indexing untuk kluster Elasticsearch.

  2. Buat dan Jalankan Tugas Sinkronisasi Data: Buat dan jalankan tugas sinkronisasi data di konsol DTS.

  3. Verifikasi Hasil Sinkronisasi Data: Masuk ke konsol Kibana kluster Elasticsearch untuk memverifikasi hasil sinkronisasi data penuh. Kemudian, tambahkan data ke database ApsaraDB RDS for MySQL sumber dan verifikasi hasil sinkronisasi data tambahan di konsol Kibana.

Prosedur

Langkah 1: Buat persiapan

Dalam contoh ini, instance ApsaraDB RDS yang menjalankan MySQL 8.0 dan kluster Alibaba Cloud Elasticsearch V7.10 telah disiapkan.

Siapkan database sumber dan data yang akan disinkronkan

  1. Buat instance ApsaraDB RDS yang menjalankan MySQL 8.0. Untuk informasi lebih lanjut, lihat Buat Instance ApsaraDB RDS for MySQL.

  2. Buat akun dan database bernama test_mysql. Untuk informasi lebih lanjut, lihat Buat Akun dan Database.

  3. Dalam database test_mysql, buat tabel bernama es_test dan sisipkan data ke dalam tabel. Dalam contoh ini, pernyataan berikut dieksekusi untuk membuat tabel dan menyisipkan data:

    -- buat tabel
    CREATE TABLE `es_test` (
        `id` bigint(32) NOT NULL,
        `name` varchar(32) NULL,
        `age` bigint(32) NULL,
        `hobby` varchar(32) NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    DEFAULT CHARACTER SET=utf8;
    
    -- sisipkan data
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');

Siapkan kluster Elasticsearch tujuan

  1. Buat kluster Alibaba Cloud Elasticsearch V7.10. Untuk informasi lebih lanjut, lihat Buat Kluster Alibaba Cloud Elasticsearch.

  2. Aktifkan fitur Auto Indexing untuk kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Konfigurasikan File YML.

    image

Langkah 2: Buat dan jalankan tugas sinkronisasi data

  1. Pergi ke Halaman Sinkronisasi Data Konsol DTS Baru.

  2. Klik Create Task.

  3. Di halaman yang muncul, buat dan konfigurasikan tugas sinkronisasi data sesuai petunjuk.

    Catatan

    Untuk informasi tentang parameter yang terlibat dalam langkah-langkah berikut, lihat Sinkronkan Data dari Instance ApsaraDB RDS for MySQL ke Kluster Elasticsearch.

    1. Konfigurasikan sumber dan tujuan. Di bagian bawah halaman, klik Test Connectivity and Proceed.

      image

    2. Konfigurasikan objek dari mana Anda ingin menyinkronkan data.

      image

    3. Konfigurasikan pengaturan lanjutan. Dalam contoh ini, pengaturan lanjutan default digunakan.

    4. Di sublangkah Data Verification, pilih Apply _routing Policy to No Tables.

      Catatan

      Jika kluster Elasticsearch tujuan adalah versi V7.X, Anda harus memilih Terapkan Kebijakan _routing ke Tidak Ada Tabel.

  4. Setelah konfigurasi selesai, simpan tugas sinkronisasi data, lakukan pra-pemeriksaan pada tugas, dan beli instance DTS untuk memulai tugas sinkronisasi data.

    Setelah instance DTS dibeli, tugas sinkronisasi data mulai berjalan. Anda dapat melihat kemajuan sinkronisasi data di halaman Sinkronisasi Data. Setelah data penuh disinkronkan, Anda dapat melihat data penuh di kluster Elasticsearch.

    image

Langkah 3: (Opsional) Verifikasi hasil sinkronisasi data

  1. Masuk ke konsol Kibana kluster Elasticsearch.

    Untuk informasi lebih lanjut, lihat Masuk ke Konsol Kibana.

  2. Di pojok kiri atas konsol Kibana, pilih 菜单.png > Management > Dev Tools. Di halaman yang muncul, klik tab Console.

  3. Verifikasi hasil sinkronisasi data penuh.

    Jalankan perintah berikut:

    GET /es_test/_search

    Jika perintah berhasil dijalankan, hasil berikut dikembalikan:

    {
      "took" : 10,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "user3",
              "age" : 43,
              "hobby" : "game"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "user5",
              "age" : 42,
              "hobby" : "basketball"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "user4",
              "age" : 24,
              "hobby" : "run"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "user2",
              "age" : 23,
              "hobby" : "sport"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "user1",
              "age" : 22,
              "hobby" : "music"
            }
          }
        ]
      }
    }
  4. Verifikasi hasil sinkronisasi data tambahan.

    1. Eksekusi pernyataan berikut untuk menyisipkan catatan data ke tabel sumber:

      INSERT INTO `test_mysql`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');
    2. Setelah catatan data disinkronkan, jalankan perintah GET /es_test/_search.

      Jika perintah berhasil dijalankan, hasil berikut dikembalikan:

      {
        "took" : 541,
        "timed_out" : false,
        "_shards" : {
          "total" : 5,
          "successful" : 5,
          "skipped" : 0,
          "failed" : 0
        },
        "hits" : {
          "total" : {
            "value" : 6,
            "relation" : "eq"
          },
          "max_score" : 1.0,
          "hits" : [
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "3",
              "_score" : 1.0,
              "_source" : {
                "id" : 3,
                "name" : "user3",
                "age" : 43,
                "hobby" : "game"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "5",
              "_score" : 1.0,
              "_source" : {
                "id" : 5,
                "name" : "user5",
                "age" : 42,
                "hobby" : "basketball"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "4",
              "_score" : 1.0,
              "_source" : {
                "id" : 4,
                "name" : "user4",
                "age" : 24,
                "hobby" : "run"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "2",
              "_score" : 1.0,
              "_source" : {
                "id" : 2,
                "name" : "user2",
                "age" : 23,
                "hobby" : "sport"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "6",
              "_score" : 1.0,
              "_source" : {
                "name" : "user6",
                "id" : 6,
                "age" : 30,
                "hobby" : "dance"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "1",
              "_score" : 1.0,
              "_source" : {
                "id" : 1,
                "name" : "user1",
                "age" : 22,
                "hobby" : "music"
              }
            }
          ]
        }
      }