Jika data bisnis Anda disimpan di PolarDB-X 1.0 dan Anda ingin melakukan pencarian teks lengkap serta analitik semantik pada data tersebut, Anda dapat menggunakan Alibaba Cloud Elasticsearch dan Alibaba Cloud Logstash. Topik ini menjelaskan cara menggunakan Alibaba Cloud Logstash untuk menyinkronkan data dari PolarDB-X 1.0 ke Alibaba Cloud Elasticsearch secara real-time.
Informasi latar belakang
Alibaba Cloud Logstash adalah alat pemrosesan data yang kuat yang dapat digunakan untuk mengumpulkan, mentransformasi, mengoptimalkan, dan menghasilkan data. Alibaba Cloud Logstash menyediakan plug-in logstash-input-jdbc, yang diinstal secara default pada kluster Logstash dan tidak dapat dihapus. Plug-in ini dapat menanyakan beberapa catatan data di PolarDB-X 1.0 sekaligus dan menyinkronkannya ke Alibaba Cloud Elasticsearch. Plug-in ini menggunakan metode round-robin untuk mengidentifikasi catatan data yang baru saja dimasukkan atau diperbarui di PolarDB-X 1.0 secara berkala dan menyinkronkannya ke Alibaba Cloud Elasticsearch. Jika Anda ingin menyinkronkan seluruh data dan dapat menerima latensi beberapa detik, atau jika Anda ingin menanyakan data tertentu pada satu waktu dan menyinkronkannya, Anda dapat menggunakan Alibaba Cloud Logstash.
Prasyarat
Sebuah instans PolarDB-X 1.0, kluster Alibaba Cloud Elasticsearch, dan kluster Alibaba Cloud Logstash telah dibuat. Selain itu, sebuah database telah dibuat di dalam instans PolarDB-X 1.0. Kami merekomendasikan agar Anda membuat instans PolarDB-X 1.0, kluster Elasticsearch, dan kluster Logstash di dalam virtual private cloud (VPC) yang sama.
Untuk informasi lebih lanjut tentang cara membuat instans PolarDB-X 1.0 dan membuat database di dalam instans tersebut, lihat Buat Instans PolarDB-X 1.0.
Untuk informasi lebih lanjut tentang cara membuat kluster Alibaba Cloud Elasticsearch, lihat Buat Kluster Alibaba Cloud Elasticsearch. Dalam contoh ini, kluster Elasticsearch V6.7 Edisi Standar dibuat.
Untuk informasi lebih lanjut tentang cara membuat kluster Alibaba Cloud Logstash, lihat Buat Kluster Alibaba Cloud Logstash.
CatatanJika Anda ingin menggunakan Logstash untuk mengumpulkan data dari Internet atau mentransfer data yang dikumpulkan ke Internet, Anda harus mengonfigurasi Network Address Translation (NAT) gateway dan menggunakan gateway tersebut untuk menghubungkan kluster Logstash Anda ke Internet. Untuk informasi lebih lanjut, lihat Konfigurasikan NAT Gateway untuk Transmisi Data melalui Internet.
Batasan
Nilai bidang _id di kluster Elasticsearch harus sama dengan nilai bidang id di database PolarDB-X 1.0.
Kondisi ini memastikan bahwa tugas sinkronisasi data dapat membuat pemetaan antara catatan data di database PolarDB-X 1.0 dan dokumen di kluster Elasticsearch. Jika Anda memperbarui catatan data di database PolarDB-X 1.0, tugas sinkronisasi data akan menggunakan catatan data yang diperbarui untuk menimpa dokumen yang memiliki ID yang sama di kluster Elasticsearch.
CatatanPada dasarnya, operasi pembaruan di Elasticsearch menghapus dokumen asli dan mengindeks dokumen baru. Oleh karena itu, operasi penimpaan ini sama efisiennya dengan operasi pembaruan yang dilakukan oleh tugas sinkronisasi data.
Jika Anda memasukkan catatan data ke dalam atau memperbarui catatan data di database PolarDB-X 1.0, catatan data tersebut harus berisi bidang yang menunjukkan waktu ketika catatan data tersebut dimasukkan atau diperbarui.
Setiap kali plug-in logstash-input-jdbc melakukan round robin, plug-in mencatat waktu ketika catatan data terakhir dalam round robin dimasukkan atau diperbarui di database PolarDB-X 1.0. Logstash hanya menyinkronkan catatan data yang memenuhi persyaratan berikut dari database PolarDB-X 1.0: Waktu ketika catatan data dimasukkan atau diperbarui di database PolarDB-X 1.0 lebih lambat daripada waktu ketika catatan data terakhir dalam round robin sebelumnya dimasukkan atau diperbarui di database PolarDB-X 1.0.
PentingJika Anda menghapus catatan data di database PolarDB-X 1.0, plug-in logstash-input-jdbc tidak dapat menghapus dokumen yang memiliki ID yang sama dari kluster Elasticsearch. Untuk menghapus dokumen dari kluster Elasticsearch, Anda harus menjalankan perintah terkait di kluster Elasticsearch.
Prosedur
Langkah 1: Persiapan awal
Buat tabel di database PolarDB-X 1.0 dan siapkan data uji di tabel tersebut.
Dalam contoh ini, pernyataan berikut digunakan untuk membuat tabel:
CREATE table food( id int PRIMARY key AUTO_INCREMENT, name VARCHAR (32), insert_time DATETIME, update_time DATETIME );Pernyataan berikut digunakan untuk memasukkan data ke dalam tabel:
INSERT INTO food values(null,'Chocolates',now(),now()); INSERT INTO food values(null,'Yogurt',now(),now()); INSERT INTO food values(null,'Ham sausage',now(),now());Aktifkan fitur Auto Indexing untuk kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Akses dan Konfigurasikan Kluster Elasticsearch.
Di kluster Logstash, unggah driver SQL JDBC yang versinya kompatibel dengan versi database PolarDB-X 1.0. Untuk informasi lebih lanjut, lihat Konfigurasikan Pustaka Pihak Ketiga. Dalam contoh ini, driver mysql-connector-java-5.1.35 digunakan.
CatatanDalam contoh ini, driver MySQL JDBC digunakan untuk terhubung ke database PolarDB-X 1.0. Anda juga dapat menggunakan driver PolarDB JDBC untuk terhubung ke database PolarDB-X 1.0. Namun, untuk database PolarDB-X 2.0, driver PolarDB JDBC mungkin tidak bekerja. Kami merekomendasikan agar Anda menggunakan driver MySQL JDBC.
Peroleh alamat IP node di kluster Logstash di halaman Informasi Dasar kluster Logstash di konsol Elasticsearch. Kemudian, tambahkan alamat IP tersebut ke daftar putih alamat IP instans PolarDB-X 1.0. Untuk informasi lebih lanjut, lihat Atur Daftar Putih Alamat IP.
Langkah 2: Konfigurasikan pipeline Logstash
Buka halaman Kluster Logstash di Konsol Elasticsearch Alibaba Cloud.
Navigasikan ke kluster yang diinginkan.
Di bilah navigasi atas, pilih wilayah tempat kluster berada.
Di halaman Logstash Clusters, temukan kluster dan klik ID-nya.
- Di panel navigasi sisi kiri, klik Pipelines.
Di halaman Pipeline, klik Create Pipeline.
Di halaman Create Task, masukkan ID pipeline di bidang Pipeline ID dan masukkan konfigurasi yang diperlukan di bidang Config Settings.
Dalam contoh ini, konfigurasi berikut dimasukkan di bidang Config Settings:
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar" jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false" jdbc_user => "db_user" jdbc_password => "db_password" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select * from food where update_time >= :sql_last_value" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt" clean_run => false tracking_column_type => "timestamp" use_column_value => true tracking_column => "update_time" } } filter { } output { elasticsearch { hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200" user => "elastic" password => "es_password" index => "drds_test" document_id => "%{id}" } }CatatanAnda harus mengganti
<Logstash cluster ID>di kode sebelumnya dengan ID kluster Logstash yang Anda gunakan. Untuk informasi lebih lanjut tentang cara memperoleh ID, lihat Ikhtisar Halaman Kluster Logstash.Tabel 1. Parameter di Bagian Input Parameter
Deskripsi
jdbc_driver_class
Kelas driver JDBC.
jdbc_driver_library
Path file driver JDBC. Untuk informasi lebih lanjut, lihat Konfigurasikan pustaka pihak ketiga.
jdbc_connection_string
String koneksi JDBC yang digunakan untuk terhubung ke database PolarDB-X 1.0. String koneksi JDBC berisi endpoint, nomor port, dan nama database PolarDB-X 1.0.
jdbc_user
Nama pengguna yang digunakan untuk mengakses database PolarDB-X 1.0.
jdbc_password
Kata sandi yang digunakan untuk mengakses database PolarDB-X 1.0.
jdbc_paging_enabled
Menentukan apakah paging diaktifkan. Nilai default: false.
jdbc_page_size
Jumlah entri per halaman.
statement
Pernyataan SQL.
schedule
Interval eksekusi pernyataan SQL. Nilai
* * * * *menunjukkan bahwa pernyataan SQL dieksekusi setiap menit.record_last_run
Menentukan apakah hasil eksekusi terakhir dicatat. Jika parameter ini diatur ke true, nilai tracking_column dalam hasil eksekusi terakhir disimpan di file di path yang ditentukan menggunakan parameter last_run_metadata_path.
last_run_metadata_path
Path file yang berisi waktu eksekusi terakhir. Path file disediakan di backend. Path ini dalam format /ssd/1/<Logstash cluster ID>/logstash/data/. Setelah Anda menentukan path, Logstash secara otomatis menghasilkan file di path tersebut, tetapi Anda tidak dapat melihat data di file tersebut.
clean_run
Menentukan apakah path yang ditentukan oleh parameter last_run_metadata_path dibersihkan. Nilai default: false. Jika parameter ini diatur ke true, setiap query dimulai dari entri pertama di database.
use_column_value
Menentukan apakah nilai kolom tertentu dicatat.
tracking_column_type
Tipe kolom yang nilainya ingin Anda lacak. Nilai default: numeric.
tracking_column
Kolom yang nilainya ingin Anda lacak. Nilai-nilai tersebut harus diurutkan dalam urutan menaik. Dalam banyak kasus, kolom ini adalah kunci utama tabel.
Tabel 2. Parameter di Bagian Output Parameter
Deskripsi
hosts
URL yang digunakan untuk mengakses kluster Elasticsearch. Tentukan URL dalam format berikut: http://<Endpoint internal kluster Elasticsearch>:9200. Anda dapat memperoleh endpoint internal dari halaman Informasi Dasar kluster. Untuk informasi lebih lanjut, lihat Lihat informasi dasar kluster.
user
Nama pengguna yang digunakan untuk mengakses kluster Elasticsearch. Nama pengguna default adalah elastic.
password
Kata sandi akun elastic. Kata sandi akun elastic ditentukan saat Anda membuat kluster Elasticsearch. Jika Anda lupa kata sandi, Anda dapat menyetel ulang. Untuk informasi lebih lanjut tentang prosedur dan perhatian saat menyetel ulang kata sandi, lihat Setel ulang kata sandi akses untuk kluster Elasticsearch.
index
Nama indeks di kluster Elasticsearch.
document_id
ID dokumen di kluster Elasticsearch. Atur parameter ini ke %{id}, yang menunjukkan bahwa ID dokumen sama dengan ID catatan data di database PolarDB-X 1.0.
PentingKonfigurasi sebelumnya didasarkan pada data uji. Anda dapat mengonfigurasi pipeline berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut tentang parameter lain yang didukung oleh plug-in input, lihat Plug-in Input Jdbc Logstash.
Jika konfigurasi Anda berisi parameter serupa dengan
last_run_metadata_path, path file harus disediakan oleh Alibaba Cloud Logstash. Path dalam format/ssd/1/<Logstash cluster ID>/logstash/data/disediakan di backend dan tersedia untuk pengujian. Sistem tidak menghapus data di path ini. Pastikan disk Anda memiliki ruang penyimpanan yang cukup saat menggunakan path ini. Setelah Anda menentukan path, Logstash secara otomatis menghasilkan file di path tersebut, tetapi Anda tidak dapat melihat data di file tersebut.Untuk tujuan keamanan, jika Anda menentukan driver JDBC saat mengonfigurasi pipeline, Anda harus menambahkan
allowLoadLocalInfile=false&autoDeserialize=falsedi akhir parameterjdbc_connection_string, sepertijdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Database name>?allowLoadLocalInfile=false&autoDeserialize=false". Jika tidak, saat Anda menambahkan file konfigurasi untuk pipeline Logstash, sistem menampilkan pesan kesalahan yang menunjukkan kegagalan pemeriksaan.
Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter di bidang Config Settings, lihat File Konfigurasi Logstash.
Klik Next untuk mengonfigurasi parameter pipeline.

Parameter
Deskripsi
Pipeline Workers
Jumlah thread pekerja yang menjalankan filter dan plug-in output pipeline secara paralel. Jika ada backlog acara atau beberapa sumber daya CPU tidak digunakan, kami merekomendasikan agar Anda meningkatkan jumlah thread untuk memaksimalkan pemanfaatan CPU. Nilai default parameter ini adalah jumlah vCPU.
Pipeline Batch Size
Jumlah maksimum acara yang dapat dikumpulkan oleh satu thread pekerja dari plug-in input sebelum mencoba menjalankan filter dan plug-in output. Jika Anda mengatur parameter ini ke nilai besar, satu thread pekerja dapat mengumpulkan lebih banyak acara tetapi mengonsumsi memori lebih besar. Jika Anda ingin memastikan bahwa thread pekerja memiliki memori yang cukup untuk mengumpulkan lebih banyak acara, tentukan variabel LS_HEAP_SIZE untuk meningkatkan ukuran heap Java virtual machine (JVM). Nilai default: 125.
Pipeline Batch Delay
Waktu tunggu untuk sebuah acara. Waktu ini terjadi sebelum Anda menetapkan batch kecil ke thread pekerja pipeline dan setelah Anda membuat tugas batch untuk acara pipeline. Nilai default: 50. Unit: milidetik.
Queue Type
Model antrian internal untuk menyangga acara. Nilai valid:
MEMORY: antrian berbasis memori tradisional. Ini adalah nilai default.
PERSISTED: antrian berbasis disk ACKed, yaitu antrian persisten.
Queue Max Bytes
Ukuran data maksimum untuk antrian. Unit: MB. Nilai valid: bilangan bulat dari
1 hingga 2<sup>53</sup> - 1. Nilai default: 1024.CatatanNilai tersebut harus lebih kecil dari kapasitas total disk Anda.
Queue Checkpoint Writes
Jumlah maksimum acara yang ditulis sebelum pemeriksaan diberlakukan ketika antrian persisten diaktifkan. Nilai 0 menunjukkan tidak ada batasan. Nilai default: 1024.
PeringatanSetelah Anda mengonfigurasi parameter, Anda harus menyimpan pengaturan dan menerapkan pipeline. Ini memicu restart kluster Logstash. Sebelum melanjutkan, pastikan bahwa restart tidak memengaruhi bisnis Anda.
Klik Save atau Save and Deploy.
Save: Setelah Anda mengklik tombol ini, sistem menyimpan pengaturan pipeline dan memicu perubahan kluster. Namun, pengaturan tersebut belum berlaku. Setelah Anda mengklik Simpan, halaman Pipelines muncul. Di halaman Pipelines, temukan pipeline yang dibuat dan klik Deploy Now di kolom Actions. Kemudian, sistem me-restart kluster Logstash untuk membuat pengaturan berlaku.
Save and Deploy: Setelah Anda mengklik tombol ini, sistem me-restart kluster Logstash untuk membuat pengaturan berlaku.
Langkah 3: Verifikasi hasil
Masuk ke konsol Kibana kluster Elasticsearch.
Untuk informasi lebih lanjut, lihat Masuk ke Konsol Kibana.
Di panel navigasi sisi kiri, klik Dev Tools.
Di tab Console pada halaman yang muncul, jalankan perintah berikut untuk melihat jumlah indeks yang menyimpan data yang disinkronkan:
GET drds_test/_count { "query": {"match_all": {}} }Jika perintah berhasil dijalankan, hasil berikut dikembalikan:
{ "count" : 3, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 } }Perbarui data di tabel dan masukkan data ke dalam tabel.
UPDATE food SET name='Chocolates',update_time=now() where id = 1; INSERT INTO food values(null,'Egg',now(),now());Lihat data yang diperbarui dan dimasukkan.
Tanyakan catatan data di mana nilai nama adalah Chocolates.
GET drds_test/_search { "query": { "match": { "name": "Chocolates" }} }Jika perintah berhasil dijalankan, hasil yang ditunjukkan pada gambar berikut dikembalikan.

Tanyakan semua data.
GET drds_test/_search { "query": { "match_all": {} } }Jika perintah berhasil dijalankan, hasil yang ditunjukkan pada gambar berikut dikembalikan.
