All Products
Search
Document Center

Elasticsearch:Gunakan Logstash untuk menyinkronkan data dari PolarDB-X (DRDS) ke Elasticsearch

Last Updated:Mar 27, 2026

Topik ini menjelaskan cara menggunakan Alibaba Cloud Logstash untuk menyinkronkan data dari database PolarDB-X 1.0 ke Alibaba Cloud Elasticsearch guna mendukung pencarian teks lengkap dan analitik semantik.

Cara kerja

Alibaba Cloud Logstash menggunakan plug-in logstash-input-jdbc untuk secara berkala mengambil catatan dari PolarDB-X 1.0 yang telah dimasukkan atau diperbarui sejak pengambilan terakhir. Plug-in ini diinstal secara default pada semua kluster Logstash dan tidak dapat dihapus.

Agar sinkronisasi inkremental berfungsi dengan benar, dua kondisi berikut harus dipenuhi:

  1. Pemetaan bidang ID: Bidang _id di Elasticsearch harus sesuai dengan bidang id di PolarDB-X 1.0. Pemetaan ini memungkinkan pipeline sinkronisasi menimpa dokumen Elasticsearch yang tepat ketika suatu catatan diperbarui di PolarDB-X 1.0.

  2. Bidang timestamp: Setiap catatan di tabel sumber harus mencakup bidang yang menyimpan waktu penyisipan atau pembaruan. Plug-in melacak bidang ini untuk menentukan catatan mana yang akan disertakan dalam setiap pengambilan—hanya catatan dengan timestamp lebih baru daripada pengambilan sebelumnya yang disinkronkan.

Gunakan pendekatan ini jika Anda perlu menyinkronkan data lengkap dengan latensi beberapa detik atau jika Anda perlu mengkueri dan menyinkronkan catatan tertentu pada interval terjadwal.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Instans PolarDB-X 1.0 dengan database yang telah dibuat

  • Kluster Alibaba Cloud Elasticsearch (contoh ini menggunakan V6.7, Edisi Standar)

  • Kluster Alibaba Cloud Logstash

Deploy ketiga sumber daya tersebut dalam virtual private cloud (VPC) yang sama untuk meminimalkan latensi jaringan dan menghindari konfigurasi tambahan.

Catatan

Untuk mengumpulkan data dari Internet atau mentransfer data ke Internet melalui Logstash, konfigurasikan gerbang Network Address Translation (NAT). Lihat Configure a NAT gateway for data transmission over the Internet.

Batasan

  • Penghapusan tidak disinkronkan: Plug-in logstash-input-jdbc tidak dapat menyebarkan penghapusan dari PolarDB-X 1.0 ke Elasticsearch. Untuk menangani catatan yang dihapus, gunakan salah satu strategi berikut:

    • Soft delete (disarankan): Tambahkan bidang is_deleted ke tabel Anda. Saat catatan dihapus secara logis, atur is_deleted menjadi true. Plug-in menyinkronkan perubahan ini ke Elasticsearch, tempat Anda dapat memfilter dokumen yang dihapus secara logis dalam kueri.

    • Penghapusan eksternal: Pastikan sistem apa pun yang bertanggung jawab menghapus catatan di PolarDB-X 1.0 juga menjalankan perintah penghapusan yang sesuai langsung pada kluster Elasticsearch.

  • Persyaratan bidang ID: Bidang _id di Elasticsearch harus sesuai dengan bidang id di PolarDB-X 1.0. Ini diperlukan agar pipeline dapat menimpa dokumen dengan benar saat catatan diperbarui. Di Elasticsearch, penimpaan setara dengan menghapus dokumen asli dan mengindeks yang diperbarui—proses ini sama efisiennya dengan operasi pembaruan native.

  • Persyaratan bidang timestamp: Setiap catatan yang dimasukkan atau diperbarui harus mencakup bidang yang merekam waktu penyisipan atau pembaruan. Catatan tanpa bidang ini tidak disertakan dalam sinkronisasi inkremental.

  • Tracking column order: Kolom yang ditentukan sebagai tracking_column harus memiliki nilai dalam urutan menaik.

Menyinkronkan data dari PolarDB-X 1.0 ke Elasticsearch

Langkah 1: Siapkan sumber dan tujuan

  1. Buat tabel di PolarDB-X 1.0 dan masukkan data uji. Contoh berikut membuat tabel food:

    CREATE TABLE food(
      id INT PRIMARY KEY AUTO_INCREMENT,
      name VARCHAR(32),
      insert_time DATETIME,
      update_time DATETIME
    );

    Kolom utama:

    • id: Primary key. Nilainya dipetakan ke bidang _id di Elasticsearch. Diperlukan agar pipeline sinkronisasi dapat mengidentifikasi dan menimpa dokumen dengan benar.

    • update_time: Timestamp pelacakan. Plug-in logstash-input-jdbc menggunakan bidang ini untuk mendeteksi perubahan sejak pengambilan terakhir, sehingga sinkronisasi inkremental dapat berfungsi.

    • insert_time: Mencatat kapan baris pertama kali dibuat. Tidak wajib untuk sinkronisasi, tetapi berguna untuk audit.

    Masukkan data sampel:

    INSERT INTO food VALUES(null, 'Chocolates', NOW(), NOW());
    INSERT INTO food VALUES(null, 'Yogurt', NOW(), NOW());
    INSERT INTO food VALUES(null, 'Ham sausage', NOW(), NOW());
  2. Aktifkan fitur Auto Indexing untuk kluster Elasticsearch Anda. Untuk detailnya, lihat Access and configure an Elasticsearch cluster.

  3. Unggah driver JDBC MySQL ke kluster Logstash. Versi driver harus kompatibel dengan instans PolarDB-X 1.0 Anda. Contoh ini menggunakan mysql-connector-java-5.1.35. Untuk instruksi pengunggahan, lihat Configure third-party libraries.

    Catatan

    Driver JDBC MySQL direkomendasikan untuk koneksi ke PolarDB-X 1.0. Driver JDBC PolarDB mungkin tidak berfungsi dengan PolarDB-X 2.0.

  4. Tambahkan alamat IP node kluster Logstash ke daftar putih alamat IP instans PolarDB-X 1.0 Anda. Temukan alamat IP node di halaman Basic Information kluster Logstash di Konsol Elasticsearch. Untuk instruksi pendaftaran daftar putih, lihat Set an IP address whitelist.

Langkah 2: Konfigurasikan pipeline Logstash

  1. Buka halaman Logstash Clusters di Konsol Alibaba Cloud Elasticsearch.

  2. Di bilah navigasi atas, pilih wilayah tempat kluster Anda berada.

  3. Di halaman Logstash Clusters, temukan kluster Anda dan klik ID-nya.

  4. Di panel navigasi kiri, klik Pipelines.

  5. Di halaman Pipelines, klik Create Pipeline.

  6. Di halaman Create Task, masukkan ID pipeline di bidang Pipeline ID, lalu masukkan konfigurasi berikut di bidang Config Settings. Ganti <Logstash cluster ID>, <Database name>, dan kredensial dengan nilai aktual Anda. Untuk menemukan ID kluster Logstash Anda, lihat Overview of the Logstash Clusters page.

    Penting

    Untuk keamanan, selalu tambahkan allowLoadLocalInfile=false&autoDeserialize=false ke jdbc_connection_string saat menggunakan driver JDBC. Tanpa parameter ini, pemeriksaan konfigurasi pipeline gagal saat Anda menyimpan.

    Parameter Masukan Plugin

    Parameter Tipe Default Deskripsi
    jdbc_driver_class String Kelas driver JDBC. Untuk MySQL, gunakan com.mysql.jdbc.Driver.
    jdbc_driver_library String Path ke file driver JDBC di kluster Logstash. Backend menyediakan path dalam format /ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/. Untuk detailnya, lihat Configure third-party libraries.
    jdbc_connection_string String String koneksi JDBC, termasuk titik akhir, nomor port, dan nama database instans PolarDB-X 1.0.
    jdbc_user String Username untuk mengakses database PolarDB-X 1.0.
    jdbc_password String Password untuk mengakses database PolarDB-X 1.0.
    jdbc_paging_enabled Boolean false Apakah pagination untuk hasil kueri diaktifkan.
    jdbc_page_size Integer Jumlah catatan per halaman saat pagination diaktifkan.
    statement String Pernyataan SQL yang digunakan untuk mengkueri catatan. Gunakan :sql_last_value sebagai placeholder untuk nilai terakhir yang dilacak. Contoh ini menggunakan >= untuk menyertakan catatan yang dimasukkan atau diperbarui tepat pada timestamp terakhir yang dilacak, sehingga meminimalkan kehilangan data di batas pengambilan.
    schedule String Ekspresi cron yang mengontrol interval polling. * * * * * menjalankan kueri setiap menit.
    record_last_run Boolean false Apakah nilai eksekusi terakhir dipertahankan. Saat true, nilai tracking_column dari kueri terakhir disimpan di file di last_run_metadata_path.
    last_run_metadata_path String Path file tempat nilai eksekusi terakhir disimpan. Gunakan path dalam format /ssd/1/<Logstash cluster ID>/logstash/data/ yang disediakan oleh Alibaba Cloud Logstash. Logstash membuat file ini secara otomatis. Anda tidak dapat melihat isi file tersebut. Sistem tidak menghapus data di path ini—pastikan disk Anda memiliki ruang yang cukup.
    clean_run Boolean false Apakah nilai eksekusi terakhir diabaikan dan dimulai dari catatan pertama di database. Atur ke true untuk memaksa resinkronisasi penuh.
    use_column_value Boolean false Apakah nilai tracking_column digunakan sebagai :sql_last_value. Saat false, :sql_last_value adalah timestamp eksekusi kueri terakhir.
    tracking_column_type String numeric Tipe data kolom pelacakan. Nilai valid: numeric, timestamp.
    tracking_column String Kolom yang dilacak untuk sinkronisasi inkremental. Nilainya harus dalam urutan menaik. Atur ke kolom yang menyimpan waktu penyisipan atau pembaruan (misalnya, update_time).

    Output Plugin Parameters

    Parameter Tipe Default Deskripsi
    hosts String URL titik akhir internal kluster Elasticsearch, dalam format http://<internal endpoint>:9200. Temukan titik akhir di halaman Basic Information kluster. Untuk detailnya, lihat View the basic information of a cluster.
    user String elastic Username untuk mengakses kluster Elasticsearch.
    password String Password untuk akun elastic. Jika lupa, Anda dapat mengatur ulang. Untuk detailnya, lihat Reset the access password for an Elasticsearch cluster.
    index String Nama indeks Elasticsearch tempat data yang disinkronkan disimpan.
    document_id String ID dokumen di Elasticsearch. Atur ke %{id} untuk menggunakan bidang id dari PolarDB-X 1.0, sehingga setiap catatan database dipetakan ke tepat satu dokumen Elasticsearch.
    input {
      jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar"
        jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
        jdbc_user => "db_user"
        jdbc_password => "db_password"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        statement => "select * from food where update_time >= :sql_last_value"
        schedule => "* * * * *"
        record_last_run => true
        last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
        clean_run => false
        tracking_column_type => "timestamp"
        use_column_value => true
        tracking_column => "update_time"
      }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200"
        user => "elastic"
        password => "es_password"
        index => "drds_test"
        document_id => "%{id}"
      }
    }

    Untuk daftar lengkap parameter plug-in input, lihat Logstash JDBC input plugin. Untuk sintaks konfigurasi pipeline umum, lihat Logstash configuration files.

  7. Klik Next untuk mengonfigurasi parameter pipeline.

    Peringatan

    Menyimpan dan deploy pipeline memicu restart kluster Logstash. Verifikasi bahwa restart tidak memengaruhi layanan Anda sebelum melanjutkan.

    Parameter Default Deskripsi
    Pipeline Workers Jumlah vCPU Jumlah thread pekerja yang menjalankan plug-in filter dan output secara paralel. Tingkatkan nilai ini jika event tertunda atau sumber daya CPU kurang dimanfaatkan.
    Pipeline Batch Size 125 Jumlah maksimum event yang dikumpulkan thread pekerja dari plug-in input sebelum menjalankan plug-in filter dan output. Nilai yang lebih besar meningkatkan throughput tetapi mengonsumsi lebih banyak memori. Untuk mengakomodasi batch yang lebih besar, tingkatkan ukuran heap JVM menggunakan variabel LS_HEAP_SIZE.
    Pipeline Batch Delay 50 ms Waktu tunggu sebelum menetapkan batch kecil ke thread pekerja pipeline.
    Queue Type MEMORY Model antrian internal untuk buffering event. MEMORY menggunakan antrian berbasis memori. PERSISTED menggunakan antrian persisten berbasis disk dengan dukungan ACK.
    Queue Max Bytes 1024 MB Ukuran maksimum antrian. Harus lebih kecil dari kapasitas disk total.
    Queue Checkpoint Writes 1024 Jumlah event yang ditulis sebelum checkpoint diberlakukan (hanya untuk antrian persisten). Atur ke 0 untuk tanpa batas.

    Configure pipeline parameters

  8. Klik Save atau Save and Deploy.

    • Save: Menyimpan konfigurasi pipeline dan memicu perubahan kluster, tetapi tidak mengaktifkan pipeline. Untuk mengaktifkannya, buka halaman Pipelines, temukan pipeline tersebut, lalu klik Deploy di kolom Actions.

    • Save and Deploy: Menyimpan dan segera merestart kluster Logstash untuk mengaktifkan pipeline.

Langkah 3: Verifikasi hasil

  1. Login ke Konsol Kibana kluster Elasticsearch. Untuk detailnya, lihat Log on to the Kibana console.

  2. Di panel navigasi kiri, klik Dev Tools.

  3. Di tab Console, jalankan kueri berikut untuk memastikan tiga catatan sampel telah disinkronkan:

    GET drds_test/_count
    {
      "query": {"match_all": {}}
    }

    Tanggapan yang diharapkan:

    {
      "count": 3,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      }
    }
  4. Perbarui catatan yang ada dan masukkan catatan baru di PolarDB-X 1.0:

    UPDATE food SET name='Chocolates', update_time=NOW() WHERE id = 1;
    INSERT INTO food VALUES(null, 'Egg', NOW(), NOW());
  5. Setelah siklus polling berikutnya selesai (dalam satu menit), verifikasi pembaruan di Kibana. Kueri catatan yang diperbarui:

    GET drds_test/_search
    {
      "query": {
        "match": {
          "name": "Chocolates"
        }
      }
    }

    Jika perintah berhasil dijalankan, hasil seperti pada gambar berikut akan ditampilkan.Returned result Kueri semua catatan untuk memastikan baris baru telah ditambahkan:

    GET drds_test/_search
    {
      "query": {
        "match_all": {}
      }
    }

    Jika perintah berhasil dijalankan, hasil seperti pada gambar berikut akan ditampilkan.Returned result

FAQ

FAQ about data transfer by using Logstash