Topik ini menjelaskan cara menggunakan Alibaba Cloud Logstash untuk menyinkronkan data dari database PolarDB-X 1.0 ke Alibaba Cloud Elasticsearch guna mendukung pencarian teks lengkap dan analitik semantik.
Cara kerja
Alibaba Cloud Logstash menggunakan plug-in logstash-input-jdbc untuk secara berkala mengambil catatan dari PolarDB-X 1.0 yang telah dimasukkan atau diperbarui sejak pengambilan terakhir. Plug-in ini diinstal secara default pada semua kluster Logstash dan tidak dapat dihapus.
Agar sinkronisasi inkremental berfungsi dengan benar, dua kondisi berikut harus dipenuhi:
-
Pemetaan bidang ID: Bidang
_iddi Elasticsearch harus sesuai dengan bidangiddi PolarDB-X 1.0. Pemetaan ini memungkinkan pipeline sinkronisasi menimpa dokumen Elasticsearch yang tepat ketika suatu catatan diperbarui di PolarDB-X 1.0. -
Bidang timestamp: Setiap catatan di tabel sumber harus mencakup bidang yang menyimpan waktu penyisipan atau pembaruan. Plug-in melacak bidang ini untuk menentukan catatan mana yang akan disertakan dalam setiap pengambilan—hanya catatan dengan timestamp lebih baru daripada pengambilan sebelumnya yang disinkronkan.
Gunakan pendekatan ini jika Anda perlu menyinkronkan data lengkap dengan latensi beberapa detik atau jika Anda perlu mengkueri dan menyinkronkan catatan tertentu pada interval terjadwal.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
-
Instans PolarDB-X 1.0 dengan database yang telah dibuat
-
Kluster Alibaba Cloud Elasticsearch (contoh ini menggunakan V6.7, Edisi Standar)
-
Kluster Alibaba Cloud Logstash
Deploy ketiga sumber daya tersebut dalam virtual private cloud (VPC) yang sama untuk meminimalkan latensi jaringan dan menghindari konfigurasi tambahan.
-
Untuk membuat instans dan database PolarDB-X 1.0, lihat Create a PolarDB-X 1.0 instance.
-
Untuk membuat kluster Elasticsearch, lihat Create an Alibaba Cloud Elasticsearch cluster.
-
Untuk membuat kluster Logstash, lihat Create an Alibaba Cloud Logstash cluster.
Untuk mengumpulkan data dari Internet atau mentransfer data ke Internet melalui Logstash, konfigurasikan gerbang Network Address Translation (NAT). Lihat Configure a NAT gateway for data transmission over the Internet.
Batasan
-
Penghapusan tidak disinkronkan: Plug-in logstash-input-jdbc tidak dapat menyebarkan penghapusan dari PolarDB-X 1.0 ke Elasticsearch. Untuk menangani catatan yang dihapus, gunakan salah satu strategi berikut:
-
Soft delete (disarankan): Tambahkan bidang
is_deletedke tabel Anda. Saat catatan dihapus secara logis, aturis_deletedmenjaditrue. Plug-in menyinkronkan perubahan ini ke Elasticsearch, tempat Anda dapat memfilter dokumen yang dihapus secara logis dalam kueri. -
Penghapusan eksternal: Pastikan sistem apa pun yang bertanggung jawab menghapus catatan di PolarDB-X 1.0 juga menjalankan perintah penghapusan yang sesuai langsung pada kluster Elasticsearch.
-
-
Persyaratan bidang ID: Bidang
_iddi Elasticsearch harus sesuai dengan bidangiddi PolarDB-X 1.0. Ini diperlukan agar pipeline dapat menimpa dokumen dengan benar saat catatan diperbarui. Di Elasticsearch, penimpaan setara dengan menghapus dokumen asli dan mengindeks yang diperbarui—proses ini sama efisiennya dengan operasi pembaruan native. -
Persyaratan bidang timestamp: Setiap catatan yang dimasukkan atau diperbarui harus mencakup bidang yang merekam waktu penyisipan atau pembaruan. Catatan tanpa bidang ini tidak disertakan dalam sinkronisasi inkremental.
-
Tracking column order: Kolom yang ditentukan sebagai
tracking_columnharus memiliki nilai dalam urutan menaik.
Menyinkronkan data dari PolarDB-X 1.0 ke Elasticsearch
Langkah 1: Siapkan sumber dan tujuan
-
Buat tabel di PolarDB-X 1.0 dan masukkan data uji. Contoh berikut membuat tabel
food:CREATE TABLE food( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(32), insert_time DATETIME, update_time DATETIME );Kolom utama:
-
id: Primary key. Nilainya dipetakan ke bidang_iddi Elasticsearch. Diperlukan agar pipeline sinkronisasi dapat mengidentifikasi dan menimpa dokumen dengan benar. -
update_time: Timestamp pelacakan. Plug-in logstash-input-jdbc menggunakan bidang ini untuk mendeteksi perubahan sejak pengambilan terakhir, sehingga sinkronisasi inkremental dapat berfungsi. -
insert_time: Mencatat kapan baris pertama kali dibuat. Tidak wajib untuk sinkronisasi, tetapi berguna untuk audit.
Masukkan data sampel:
INSERT INTO food VALUES(null, 'Chocolates', NOW(), NOW()); INSERT INTO food VALUES(null, 'Yogurt', NOW(), NOW()); INSERT INTO food VALUES(null, 'Ham sausage', NOW(), NOW()); -
-
Aktifkan fitur Auto Indexing untuk kluster Elasticsearch Anda. Untuk detailnya, lihat Access and configure an Elasticsearch cluster.
-
Unggah driver JDBC MySQL ke kluster Logstash. Versi driver harus kompatibel dengan instans PolarDB-X 1.0 Anda. Contoh ini menggunakan
mysql-connector-java-5.1.35. Untuk instruksi pengunggahan, lihat Configure third-party libraries.CatatanDriver JDBC MySQL direkomendasikan untuk koneksi ke PolarDB-X 1.0. Driver JDBC PolarDB mungkin tidak berfungsi dengan PolarDB-X 2.0.
-
Tambahkan alamat IP node kluster Logstash ke daftar putih alamat IP instans PolarDB-X 1.0 Anda. Temukan alamat IP node di halaman Basic Information kluster Logstash di Konsol Elasticsearch. Untuk instruksi pendaftaran daftar putih, lihat Set an IP address whitelist.
Langkah 2: Konfigurasikan pipeline Logstash
-
Buka halaman Logstash Clusters di Konsol Alibaba Cloud Elasticsearch.
-
Di bilah navigasi atas, pilih wilayah tempat kluster Anda berada.
-
Di halaman Logstash Clusters, temukan kluster Anda dan klik ID-nya.
-
Di panel navigasi kiri, klik Pipelines.
-
Di halaman Pipelines, klik Create Pipeline.
-
Di halaman Create Task, masukkan ID pipeline di bidang Pipeline ID, lalu masukkan konfigurasi berikut di bidang Config Settings. Ganti
<Logstash cluster ID>,<Database name>, dan kredensial dengan nilai aktual Anda. Untuk menemukan ID kluster Logstash Anda, lihat Overview of the Logstash Clusters page.PentingUntuk keamanan, selalu tambahkan
allowLoadLocalInfile=false&autoDeserialize=falsekejdbc_connection_stringsaat menggunakan driver JDBC. Tanpa parameter ini, pemeriksaan konfigurasi pipeline gagal saat Anda menyimpan.Parameter Masukan Plugin
Parameter Tipe Default Deskripsi jdbc_driver_classString — Kelas driver JDBC. Untuk MySQL, gunakan com.mysql.jdbc.Driver.jdbc_driver_libraryString — Path ke file driver JDBC di kluster Logstash. Backend menyediakan path dalam format /ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/. Untuk detailnya, lihat Configure third-party libraries.jdbc_connection_stringString — String koneksi JDBC, termasuk titik akhir, nomor port, dan nama database instans PolarDB-X 1.0. jdbc_userString — Username untuk mengakses database PolarDB-X 1.0. jdbc_passwordString — Password untuk mengakses database PolarDB-X 1.0. jdbc_paging_enabledBoolean falseApakah pagination untuk hasil kueri diaktifkan. jdbc_page_sizeInteger — Jumlah catatan per halaman saat pagination diaktifkan. statementString — Pernyataan SQL yang digunakan untuk mengkueri catatan. Gunakan :sql_last_valuesebagai placeholder untuk nilai terakhir yang dilacak. Contoh ini menggunakan>=untuk menyertakan catatan yang dimasukkan atau diperbarui tepat pada timestamp terakhir yang dilacak, sehingga meminimalkan kehilangan data di batas pengambilan.scheduleString — Ekspresi cron yang mengontrol interval polling. * * * * *menjalankan kueri setiap menit.record_last_runBoolean falseApakah nilai eksekusi terakhir dipertahankan. Saat true, nilaitracking_columndari kueri terakhir disimpan di file dilast_run_metadata_path.last_run_metadata_pathString — Path file tempat nilai eksekusi terakhir disimpan. Gunakan path dalam format /ssd/1/<Logstash cluster ID>/logstash/data/yang disediakan oleh Alibaba Cloud Logstash. Logstash membuat file ini secara otomatis. Anda tidak dapat melihat isi file tersebut. Sistem tidak menghapus data di path ini—pastikan disk Anda memiliki ruang yang cukup.clean_runBoolean falseApakah nilai eksekusi terakhir diabaikan dan dimulai dari catatan pertama di database. Atur ke trueuntuk memaksa resinkronisasi penuh.use_column_valueBoolean falseApakah nilai tracking_columndigunakan sebagai:sql_last_value. Saatfalse,:sql_last_valueadalah timestamp eksekusi kueri terakhir.tracking_column_typeString numericTipe data kolom pelacakan. Nilai valid: numeric,timestamp.tracking_columnString — Kolom yang dilacak untuk sinkronisasi inkremental. Nilainya harus dalam urutan menaik. Atur ke kolom yang menyimpan waktu penyisipan atau pembaruan (misalnya, update_time).Output Plugin Parameters
Parameter Tipe Default Deskripsi hostsString — URL titik akhir internal kluster Elasticsearch, dalam format http://<internal endpoint>:9200. Temukan titik akhir di halaman Basic Information kluster. Untuk detailnya, lihat View the basic information of a cluster.userString elasticUsername untuk mengakses kluster Elasticsearch. passwordString — Password untuk akun elastic. Jika lupa, Anda dapat mengatur ulang. Untuk detailnya, lihat Reset the access password for an Elasticsearch cluster.indexString — Nama indeks Elasticsearch tempat data yang disinkronkan disimpan. document_idString — ID dokumen di Elasticsearch. Atur ke %{id}untuk menggunakan bidangiddari PolarDB-X 1.0, sehingga setiap catatan database dipetakan ke tepat satu dokumen Elasticsearch.input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar" jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false" jdbc_user => "db_user" jdbc_password => "db_password" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select * from food where update_time >= :sql_last_value" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt" clean_run => false tracking_column_type => "timestamp" use_column_value => true tracking_column => "update_time" } } filter { } output { elasticsearch { hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200" user => "elastic" password => "es_password" index => "drds_test" document_id => "%{id}" } }Untuk daftar lengkap parameter plug-in input, lihat Logstash JDBC input plugin. Untuk sintaks konfigurasi pipeline umum, lihat Logstash configuration files.
-
Klik Next untuk mengonfigurasi parameter pipeline.
PeringatanMenyimpan dan deploy pipeline memicu restart kluster Logstash. Verifikasi bahwa restart tidak memengaruhi layanan Anda sebelum melanjutkan.
Parameter Default Deskripsi Pipeline Workers Jumlah vCPU Jumlah thread pekerja yang menjalankan plug-in filter dan output secara paralel. Tingkatkan nilai ini jika event tertunda atau sumber daya CPU kurang dimanfaatkan. Pipeline Batch Size 125 Jumlah maksimum event yang dikumpulkan thread pekerja dari plug-in input sebelum menjalankan plug-in filter dan output. Nilai yang lebih besar meningkatkan throughput tetapi mengonsumsi lebih banyak memori. Untuk mengakomodasi batch yang lebih besar, tingkatkan ukuran heap JVM menggunakan variabel LS_HEAP_SIZE.Pipeline Batch Delay 50 ms Waktu tunggu sebelum menetapkan batch kecil ke thread pekerja pipeline. Queue Type MEMORY Model antrian internal untuk buffering event. MEMORY menggunakan antrian berbasis memori. PERSISTED menggunakan antrian persisten berbasis disk dengan dukungan ACK. Queue Max Bytes 1024 MB Ukuran maksimum antrian. Harus lebih kecil dari kapasitas disk total. Queue Checkpoint Writes 1024 Jumlah event yang ditulis sebelum checkpoint diberlakukan (hanya untuk antrian persisten). Atur ke 0untuk tanpa batas.
-
Klik Save atau Save and Deploy.
-
Save: Menyimpan konfigurasi pipeline dan memicu perubahan kluster, tetapi tidak mengaktifkan pipeline. Untuk mengaktifkannya, buka halaman Pipelines, temukan pipeline tersebut, lalu klik Deploy di kolom Actions.
-
Save and Deploy: Menyimpan dan segera merestart kluster Logstash untuk mengaktifkan pipeline.
-
Langkah 3: Verifikasi hasil
-
Login ke Konsol Kibana kluster Elasticsearch. Untuk detailnya, lihat Log on to the Kibana console.
-
Di panel navigasi kiri, klik Dev Tools.
-
Di tab Console, jalankan kueri berikut untuk memastikan tiga catatan sampel telah disinkronkan:
GET drds_test/_count { "query": {"match_all": {}} }Tanggapan yang diharapkan:
{ "count": 3, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 } } -
Perbarui catatan yang ada dan masukkan catatan baru di PolarDB-X 1.0:
UPDATE food SET name='Chocolates', update_time=NOW() WHERE id = 1; INSERT INTO food VALUES(null, 'Egg', NOW(), NOW()); -
Setelah siklus polling berikutnya selesai (dalam satu menit), verifikasi pembaruan di Kibana. Kueri catatan yang diperbarui:
GET drds_test/_search { "query": { "match": { "name": "Chocolates" } } }Jika perintah berhasil dijalankan, hasil seperti pada gambar berikut akan ditampilkan.
Kueri semua catatan untuk memastikan baris baru telah ditambahkan:GET drds_test/_search { "query": { "match_all": {} } }Jika perintah berhasil dijalankan, hasil seperti pada gambar berikut akan ditampilkan.
