全部产品
Search
文档中心

Elasticsearch:Gunakan Logstash untuk menyinkronkan data dari ApsaraDB RDS for MySQL ke Elasticsearch

更新时间:Jul 06, 2025

Jika Anda ingin menyinkronkan data dari database ApsaraDB RDS for MySQL ke kluster Alibaba Cloud Elasticsearch, Anda dapat menggunakan plug-in logstash-input-jdbc dan fitur konfigurasi pipeline yang disediakan oleh Alibaba Cloud Logstash. Plug-in ini adalah bawaan dari Alibaba Cloud Logstash dan tidak dapat dihapus. Metode ini memungkinkan Anda menyinkronkan data penuh atau inkremental dari database ApsaraDB RDS for MySQL ke kluster Elasticsearch. Topik ini menjelaskan prosedur secara rinci.

Informasi latar belakang

Alibaba Cloud Logstash adalah alat pemrosesan data yang kuat yang dapat digunakan untuk mengumpulkan, mentransformasi, mengoptimalkan, dan menghasilkan data. Alibaba Cloud Logstash menyediakan plug-in logstash-input-jdbc, yang diinstal secara default pada kluster Logstash dan tidak dapat dihapus. Plug-in ini dapat meminta beberapa catatan data dalam ApsaraDB RDS for MySQL sekaligus dan menyinkronkannya ke Alibaba Cloud Elasticsearch. Plug-in ini menggunakan metode round-robin untuk mengidentifikasi catatan data yang baru saja dimasukkan atau diperbarui dalam ApsaraDB RDS for MySQL secara berkala dan menyinkronkannya ke Alibaba Cloud Elasticsearch. Untuk informasi lebih lanjut, lihat Gunakan Logstash dan JDBC untuk menyinkronkan data dari database relasional ke kluster Elasticsearch. Jika Anda ingin menyinkronkan data penuh dengan latensi beberapa detik atau meminta data tertentu pada satu waktu, Anda dapat menggunakan Alibaba Cloud Logstash.

Prasyarat

Disarankan untuk membuat instance dan kluster berikut dalam virtual private cloud (VPC) yang sama.

Catatan

Anda juga dapat menggunakan Logstash untuk menyinkronkan data dari instance ApsaraDB RDS for MySQL yang diterapkan di Internet. Sebelum melakukannya, Anda harus mengonfigurasi entri Source Network Address Translation (SNAT) untuk kluster Logstash, mengaktifkan alamat IP publik instance ApsaraDB RDS for MySQL, dan menambahkan alamat IP terkait ke daftar putih alamat IP instance tersebut. Untuk informasi lebih lanjut tentang cara mengonfigurasi entri SNAT, lihat Konfigurasikan gateway NAT untuk transmisi data melalui Internet. Untuk informasi lebih lanjut tentang cara menambahkan alamat IP ke daftar putih alamat IP instance ApsaraDB RDS for MySQL, lihat Konfigurasikan daftar putih alamat IP untuk instance ApsaraDB RDS for MySQL.

Batasan

  • Instance ApsaraDB RDS for MySQL, kluster Elasticsearch, dan kluster Logstash harus berada dalam zona waktu yang sama. Jika tidak, data terkait waktu mungkin memiliki offset zona waktu setelah penyinkronan.

  • Nilai bidang _id dalam kluster Elasticsearch harus sama dengan nilai bidang id dalam database ApsaraDB RDS for MySQL.

    Kondisi ini memastikan bahwa tugas sinkronisasi data dapat membuat pemetaan antara catatan data dalam database ApsaraDB RDS for MySQL dan dokumen dalam kluster Elasticsearch. Jika Anda memperbarui catatan data dalam database ApsaraDB RDS for MySQL, tugas sinkronisasi data akan menggunakan catatan data yang diperbarui untuk menimpa dokumen yang memiliki ID yang sama dalam kluster Elasticsearch.

    Catatan

    Pada dasarnya, operasi pembaruan dalam Elasticsearch menghapus dokumen asli dan mengindeks dokumen baru. Oleh karena itu, operasi penimpaan ini sama efisiennya dengan operasi pembaruan yang dilakukan oleh tugas sinkronisasi data.

  • Jika Anda memasukkan atau memperbarui catatan data dalam database ApsaraDB RDS for MySQL, catatan data tersebut harus berisi bidang yang menunjukkan waktu ketika catatan data dimasukkan atau diperbarui.

    Setiap kali plug-in logstash-input-jdbc melakukan round robin, plug-in mencatat waktu ketika catatan data terakhir dalam round robin dimasukkan atau diperbarui dalam database ApsaraDB RDS for MySQL. Logstash hanya menyinkronkan catatan data yang memenuhi persyaratan berikut dari database ApsaraDB RDS for MySQL: Waktu ketika catatan data dimasukkan atau diperbarui dalam database ApsaraDB RDS for MySQL lebih lambat daripada waktu ketika catatan data terakhir dalam round robin sebelumnya dimasukkan atau diperbarui dalam database ApsaraDB RDS for MySQL.

    Penting

    Jika Anda menghapus catatan data dalam database ApsaraDB RDS for MySQL, plug-in logstash-input-jdbc tidak dapat menghapus dokumen yang memiliki ID yang sama dari kluster Elasticsearch. Untuk menghapus dokumen dari kluster Elasticsearch, Anda harus menjalankan perintah terkait pada kluster Elasticsearch.

Prosedur

Langkah 1: Persiapan

  1. Aktifkan fitur Auto Indexing untuk kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Akses dan konfigurasikan kluster Elasticsearch.

  2. Dalam kluster Logstash, unggah driver SQL JDBC yang versinya kompatibel dengan versi instance ApsaraDB RDS for MySQL. Dalam contoh ini, driver mysql-connector-java-5.1.48.jar digunakan. Untuk informasi lebih lanjut, lihat Konfigurasikan pustaka pihak ketiga.

  3. Persiapkan data uji. Dalam contoh ini, pernyataan berikut digunakan untuk membuat tabel:

    CREATE table food (
      id int PRIMARY key AUTO_INCREMENT,
      name VARCHAR (32),
      insert_time DATETIME,
      update_time DATETIME
    );

    Pernyataan berikut digunakan untuk memasukkan data ke dalam tabel:

    INSERT INTO food values(null,'Cokelat',now(),now());
    INSERT INTO food values(null,'Yogurt',now(),now());
    INSERT INTO food values(null,'Sosis ham',now(),now());
  4. Tambahkan alamat IP node dalam kluster Logstash ke daftar putih instance ApsaraDB RDS for MySQL. Anda dapat memperoleh alamat IP pada halaman Informasi Dasar kluster Logstash.

Langkah 2: Konfigurasikan pipeline Logstash

  1. Pergi ke Halaman Kluster Logstash dari konsol Elasticsearch Alibaba Cloud.

  2. Navigasikan ke kluster yang diinginkan.

    1. Di bilah navigasi atas, pilih wilayah tempat kluster berada.

    2. Di halaman Logstash Clusters, temukan kluster dan klik ID-nya.

  3. Di panel navigasi sisi kiri halaman yang muncul, klik Pipelines.

  4. Di halaman Pipeline, klik Create Pipeline.

  5. Di halaman Create, masukkan ID pipeline di bidang Pipeline ID dan masukkan konfigurasi yang diperlukan di bidang Config Settings.

    Dalam contoh ini, konfigurasi berikut dimasukkan di bidang Config Settings:

    input {
      jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar"
        jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<Nama Database>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
        jdbc_user => "xxxxx"
        jdbc_password => "xxxx"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        statement => "select * from food where update_time >= :sql_last_value"
        schedule => "* * * * *"
        record_last_run => true
        last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
        clean_run => false
        tracking_column_type => "timestamp"
        use_column_value => true
        tracking_column => "update_time"
      }
    }
    filter {
    }
    output {
     elasticsearch {
        hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200"
        index => "rds_es_dxhtest_datetime"
        user => "elastic"
        password => "xxxxxxx"
        document_id => "%{id}"
      }
    }
    Catatan

    Anda harus mengganti <Logstash cluster ID> dalam kode sebelumnya dengan ID kluster Logstash yang Anda gunakan. Untuk informasi lebih lanjut tentang cara mendapatkan ID, lihat Lihat informasi dasar kluster.

    Tabel 1. Deskripsi konfigurasi di bidang Config Settings

    Parameter

    Deskripsi

    input

    Menentukan sumber data input. Untuk informasi lebih lanjut tentang jenis sumber data yang didukung, lihat Plug-in input. Dalam contoh ini, sumber data input yang terhubung menggunakan JDBC digunakan. Untuk informasi lebih lanjut tentang parameter terkait, lihat Parameter dalam bagian input.

    filter

    Menentukan plug-in yang digunakan untuk memfilter data input. Untuk informasi lebih lanjut tentang plug-in yang didukung, lihat Filter plugins.

    output

    Menentukan sumber data output. Untuk informasi lebih lanjut tentang jenis sumber data yang didukung, lihat Output plugins. Dalam contoh ini, data dalam database ApsaraDB RDS for MySQL disinkronkan ke kluster Elasticsearch. Oleh karena itu, informasi kluster Elasticsearch dikonfigurasi di bagian output. Untuk informasi lebih lanjut tentang parameter terkait, lihat Langkah 3: Buat dan jalankan pipeline Logstash.

    Penting

    Jika parameter file_extend ditentukan dalam konfigurasi output pipeline, Anda harus memastikan bahwa plug-in logstash-output-file_extend diinstal untuk kluster Logstash. Untuk informasi lebih lanjut, lihat Instal atau hapus plug-in Logstash.

    Tabel 2. Parameter dalam bagian input

    Parameter

    Deskripsi

    jdbc_driver_class

    Kelas driver JDBC.

    jdbc_driver_library

    File driver yang digunakan untuk koneksi berbasis JDBC ke database ApsaraDB RDS for MySQL. Konfigurasikan parameter ini dalam format /ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/<Nama file driver>. Anda harus mengunggah file driver yang diinginkan di konsol Elasticsearch terlebih dahulu. Untuk informasi lebih lanjut tentang file driver yang didukung oleh Logstash dan cara mengunggah file driver, lihat Konfigurasikan pustaka pihak ketiga.

    jdbc_connection_string

    String koneksi yang digunakan untuk menghubungkan ke database ApsaraDB RDS for MySQL. String koneksi berisi endpoint dan nomor port instance ApsaraDB RDS for MySQL terkait serta nama database ApsaraDB RDS for MySQL. Konfigurasikan parameter ini dalam format berikut: jdbc:mysql://<Endpoint instance ApsaraDB RDS for MySQL>:<Nomor port instance ApsaraDB RDS for MySQL>/<Nama database ApsaraDB RDS for MySQL>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false.

    • <Endpoint instance ApsaraDB RDS for MySQL>: Anda harus menentukan endpoint internal instance ApsaraDB RDS for MySQL.

    • Catatan

      Jika Anda ingin menggunakan endpoint publik instance ApsaraDB RDS for MySQL, Anda harus mengonfigurasi gateway Network Address Translation (NAT) untuk kluster Logstash dan mengonfigurasi parameter dalam format jdbc:mysql://<Endpoint publik instance ApsaraDB RDS for MySQL>:<Nomor port instance ApsaraDB RDS for MySQL> untuk mengaktifkan kluster Logstash terhubung ke Internet. Untuk informasi lebih lanjut, lihat Konfigurasikan gateway NAT untuk transmisi data melalui Internet.

    • <Nomor port instance ApsaraDB RDS for MySQL>: Nomor port harus sama dengan nomor port lalu lintas keluar instance ApsaraDB RDS for MySQL. Pada umumnya, nomor port adalah 3306.

    jdbc_user

    Nama pengguna yang digunakan untuk mengakses database ApsaraDB RDS for MySQL.

    jdbc_password

    Kata sandi yang digunakan untuk mengakses database ApsaraDB RDS for MySQL.

    jdbc_paging_enabled

    Menentukan apakah paging diaktifkan. Nilai default: false.

    jdbc_page_size

    Jumlah entri per halaman.

    statement

    Pernyataan SQL yang digunakan untuk meminta data dari database ApsaraDB RDS for MySQL. Jika Anda ingin meminta data dari beberapa tabel dalam database ApsaraDB RDS for MySQL, Anda dapat menggunakan pernyataan JOIN.

    Catatan

    Nilai sql_last_value digunakan untuk menghitung baris yang akan dipertanyakan. Secara default, parameter ini diatur ke Kamis, 1 Januari 1970. Untuk informasi lebih lanjut, lihat Jdbc input plugin.

    schedule

    Interval di mana pernyataan SQL dieksekusi. Nilai * * * * * menunjukkan bahwa pernyataan SQL dieksekusi setiap menit. Atur parameter ini ke ekspresi cron yang didukung oleh Rufus.

    record_last_run

    Menentukan apakah hasil eksekusi terakhir dicatat. Jika parameter ini diatur ke true, nilai tracking_column dalam hasil eksekusi terakhir disimpan dalam file di jalur yang ditentukan menggunakan parameter last_run_metadata_path.

    last_run_metadata_path

    Jalur file yang berisi waktu eksekusi terakhir. Jalur file disediakan di backend. Jalurnya dalam format /ssd/1/<Logstash cluster ID>/logstash/data/. Setelah Anda menentukan jalur, Logstash secara otomatis menghasilkan file di jalur tersebut, tetapi Anda tidak dapat melihat data dalam file tersebut.

    Catatan

    Kami merekomendasikan agar Anda mengonfigurasi parameter ini dalam format /ssd/1/<Logstash cluster ID>/logstash/data/ saat Anda mengonfigurasi pipeline Logstash. Jika Anda mengonfigurasi parameter ini dalam format lain, kondisi rekaman untuk sinkronisasi tidak dapat disimpan dalam file di jalur yang ditentukan menggunakan parameter last_run_metadata_path. Kegagalan penyimpanan disebabkan oleh izin yang tidak cukup.

    clean_run

    Menentukan apakah jalur yang ditentukan menggunakan parameter last_run_metadata_path dibersihkan. Nilai default: false. Jika parameter ini diatur ke true, setiap query dimulai dari entri pertama dalam database.

    use_column_value

    Menentukan apakah nilai kolom tertentu dicatat. Jika parameter ini diatur ke true, sistem mencatat nilai terbaru dari kolom yang ditentukan menggunakan tracking_column dan menentukan catatan yang perlu diperbarui dalam file berdasarkan nilai tracking_column ketika pernyataan SQL dijalankan untuk kali berikutnya.

    tracking_column_type

    Tipe kolom yang nilainya ingin Anda lacak. Nilai default: numeric.

    tracking_column

    Kolom yang nilainya ingin Anda lacak. Nilai-nilai tersebut harus diurutkan dalam urutan menaik. Pada umumnya, kolom ini adalah kunci utama.

    Penting
    • Konfigurasi sebelumnya didasarkan pada data uji. Anda dapat mengonfigurasi pipeline berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut tentang parameter lain yang didukung oleh plug-in input, lihat Logstash Jdbc input plugin.

    • Jika konfigurasi Anda berisi parameter serupa dengan last_run_metadata_path, jalur file harus disediakan oleh Logstash. Jalur dalam format /ssd/1/<Logstash cluster ID>/logstash/data/ disediakan di backend dan tersedia untuk pengujian. Sistem tidak akan menghapus data dalam jalur ini. Pastikan disk Anda memiliki ruang penyimpanan yang cukup saat menggunakan jalur ini. Setelah Anda menentukan jalur, Logstash secara otomatis menghasilkan file di jalur tersebut, tetapi Anda tidak dapat melihat data dalam file tersebut.

    • Untuk alasan keamanan, jika Anda menentukan driver JDBC saat mengonfigurasi pipeline, Anda harus menambahkan allowLoadLocalInfile=false&autoDeserialize=false di akhir parameter jdbc_connection_string, seperti jdbc_connection_string => "jdbc:mysql://xxx.drds.aliyuncs.com:3306/<Nama Database>?allowLoadLocalInfile=false&autoDeserialize=false". Jika tidak, ketika Anda menambahkan file konfigurasi untuk pipeline Logstash, sistem akan menampilkan pesan kesalahan yang menunjukkan kegagalan pemeriksaan.

    Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter di bidang Config Settings, lihat File konfigurasi Logstash.

  6. Klik Next untuk mengonfigurasi parameter pipeline.

    管道参数配置

    Parameter

    Deskripsi

    Pipeline Workers

    Jumlah thread pekerja yang menjalankan plug-in filter dan output pipeline secara paralel. Jika terdapat backlog peristiwa atau beberapa sumber daya CPU tidak digunakan, kami merekomendasikan agar Anda meningkatkan jumlah thread untuk memaksimalkan pemanfaatan CPU. Nilai default parameter ini adalah jumlah vCPU.

    Pipeline Batch Size

    Jumlah maksimum peristiwa yang dapat dikumpulkan oleh satu thread pekerja dari plug-in input sebelum mencoba menjalankan plug-in filter dan output. Jika Anda mengatur parameter ini ke nilai besar, satu thread pekerja dapat mengumpulkan lebih banyak peristiwa tetapi mengonsumsi memori yang lebih besar. Jika Anda ingin memastikan bahwa thread pekerja memiliki memori yang cukup untuk mengumpulkan lebih banyak peristiwa, tentukan variabel LS_HEAP_SIZE untuk meningkatkan ukuran heap Java virtual machine (JVM). Nilai default: 125.

    Pipeline Batch Delay

    Waktu tunggu untuk sebuah peristiwa. Waktu ini terjadi sebelum Anda menetapkan batch kecil ke thread pekerja pipeline dan setelah Anda membuat tugas batch untuk peristiwa pipeline. Nilai default: 50. Unit: milidetik.

    Queue Type

    Model antrian internal untuk penyangga peristiwa. Nilai valid:

    • MEMORY: antrian berbasis memori tradisional. Ini adalah nilai default.

    • PERSISTED: antrian berbasis disk ACKed, yaitu antrian persisten.

    Queue Max Bytes

    Ukuran data maksimum untuk antrian. Unit: MB. Nilai valid: bilangan bulat mulai dari 1 hingga 2<sup>53</sup> - 1. Nilai default: 1024.

    Catatan

    Nilai ini harus kurang dari kapasitas total disk Anda.

    Queue Checkpoint Writes

    Jumlah maksimum peristiwa yang ditulis sebelum checkpoint diberlakukan ketika antrian persisten diaktifkan. Nilai 0 menunjukkan tidak ada batas. Nilai default: 1024.

    Peringatan

    Setelah Anda mengonfigurasi parameter, Anda harus menyimpan pengaturan dan menerapkan pipeline. Ini memicu restart kluster Logstash. Sebelum Anda melanjutkan, pastikan bahwa restart tidak memengaruhi bisnis Anda.

  7. Klik Save atau Save and Deploy.

    • Save: Setelah Anda mengklik tombol ini, sistem menyimpan pengaturan pipeline dan memicu perubahan kluster. Namun, pengaturan tersebut tidak langsung berlaku. Setelah Anda mengklik Simpan, halaman Pipelines muncul. Di halaman Pipelines, temukan pipeline yang dibuat dan klik Deploy Now di kolom Actions. Kemudian, sistem me-restart kluster Logstash untuk membuat pengaturan berlaku.

    • Save and Deploy: Setelah Anda mengklik tombol ini, sistem me-restart kluster Logstash untuk membuat pengaturan berlaku.

Langkah 3: Verifikasi hasil

  1. Masuk ke konsol Kibana kluster Elasticsearch.

    Untuk informasi lebih lanjut, lihat Masuk ke konsol Kibana.

  2. Di pojok kiri atas, klik ikon 菜单.png dan pilih Management > Dev Tools.

  3. Di tab Console halaman yang muncul, jalankan perintah berikut untuk melihat jumlah indeks yang menyimpan data yang disinkronkan:

    GET rds_es_dxhtest_datetime/_count
    {
      "query": {"match_all": {}}
    }

    Jika perintah berhasil dijalankan, hasil berikut dikembalikan:

    {
      "count" : 3,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      }
    }
  4. Perbarui data dalam tabel MySQL dan masukkan data ke dalam tabel.

    UPDATE food SET name='Cokelat',update_time=now() where id = 1;
    INSERT INTO food values(null,'Telur',now(),now());
  5. Lihat data yang diperbarui dan dimasukkan.

    • Permintaan catatan data di mana nilai nama adalah Cokelat.

      GET rds_es_dxhtest_datetime/_search
      {
        "query": {
          "match": {
            "name": "Cokelat"
         }}
      }

      Jika perintah berhasil dijalankan, hasil berikut dikembalikan:

      返回结果

    • Permintaan semua data.

      GET rds_es_dxhtest_datetime/_search
      {
        "query": {
          "match_all": {}
        }
      }

      Jika perintah berhasil dijalankan, hasil yang ditunjukkan pada gambar berikut dikembalikan.

      返回结果

Tanya Jawab Umum

  • P: Apa yang harus saya lakukan jika tugas sinkronisasi data saya gagal karena pipeline macet dalam status menginisialisasi, data sebelum dan sesudah sinkronisasi tidak konsisten, atau koneksi ke database gagal?

    A: Periksa apakah log kluster Logstash Anda berisi informasi kesalahan dan identifikasi penyebabnya berdasarkan informasi kesalahan tersebut. Untuk informasi lebih lanjut, lihat Kueri log. Tabel berikut menjelaskan penyebab umum kesalahan dan solusi untuk kesalahan tersebut.

    Catatan

    Jika operasi pembaruan sedang dilakukan pada kluster Logstash Anda saat Anda melakukan operasi yang dijelaskan dalam solusi berikut, jeda operasi pembaruan dengan merujuk ke Lihat kemajuan tugas kluster. Setelah operasi yang dijelaskan dalam solusi selesai, sistem me-restart kluster Logstash dan melanjutkan operasi pembaruan.

    Penyebab

    Solusi

    Alamat IP node dalam kluster Logstash belum ditambahkan ke daftar putih instance ApsaraDB RDS for MySQL.

    Tambahkan alamat IP node dalam kluster Logstash ke daftar putih instance ApsaraDB RDS for MySQL. Untuk informasi lebih lanjut, lihat Gunakan klien database atau CLI untuk terhubung ke instance ApsaraDB RDS for MySQL.

    Catatan

    Untuk informasi lebih lanjut tentang cara mendapatkan alamat IP node dalam kluster Logstash, lihat Lihat informasi dasar kluster.

    Anda menggunakan kluster Logstash untuk menyinkronkan data dari database MySQL yang dikelola sendiri yang di-hosting pada instance ECS, tetapi alamat IP privat dan port internal node dalam kluster belum ditambahkan ke grup keamanan instance ECS.

    Tambahkan alamat IP privat dan port internal node dalam kluster ke grup keamanan instance ECS. Untuk informasi lebih lanjut, lihat Tambahkan aturan grup keamanan.

    Catatan

    Untuk informasi lebih lanjut tentang cara mendapatkan alamat IP dan port node dalam kluster Logstash, lihat Lihat informasi dasar kluster.

    Kluster Elasticsearch tidak berada dalam VPC yang sama dengan kluster Logstash.

    Gunakan salah satu solusi berikut:

    Endpoint instance ApsaraDB RDS for MySQL salah, dan nomor port instance bukan 3306.

    Dapatkan endpoint dan nomor port yang benar. Untuk informasi lebih lanjut, lihat Lihat dan kelola endpoint dan port instance. Kemudian, ganti endpoint dan nomor port dalam nilai parameter jdbc_connection_string dengan endpoint dan nomor port yang Anda dapatkan.

    Penting

    <Endpoint instance ApsaraDB RDS for MySQL>: Anda harus menentukan endpoint internal instance ApsaraDB RDS for MySQL. Jika Anda ingin menggunakan endpoint publik instance ApsaraDB RDS for MySQL, Anda harus mengonfigurasi gateway NAT untuk kluster Logstash untuk mengaktifkan kluster Logstash terhubung ke Internet. Untuk informasi lebih lanjut, lihat Konfigurasikan gateway NAT untuk transmisi data melalui Internet.

    Fitur Auto Indexing dinonaktifkan untuk kluster Elasticsearch.

    Aktifkan fitur Auto Indexing untuk kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Konfigurasikan file YML.

    Beban kluster Elasticsearch atau Logstash terlalu tinggi.

    Tingkatkan konfigurasi kluster Elasticsearch atau Logstash. Untuk informasi lebih lanjut, lihat Tingkatkan konfigurasi kluster.

    Catatan

    Jika beban kluster Elasticsearch terlalu tinggi, Anda dapat melihat data pemantauan yang dikumpulkan berdasarkan metrik di konsol Elasticsearch untuk mendapatkan informasi beban kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Metrik dan saran penanganan anomali. Jika beban kluster Logstash terlalu tinggi, Anda dapat mengaktifkan fitur X-Pack Monitoring untuk kluster Logstash, gunakan fitur tersebut untuk memantau kluster Logstash, dan kemudian lihat data pemantauan. Untuk informasi lebih lanjut, lihat Aktifkan fitur X-Pack Monitoring.

    Tidak ada file driver yang digunakan untuk koneksi berbasis JDBC ke database ApsaraDB RDS for MySQL yang diunggah.

    Unggah file driver. Untuk informasi lebih lanjut, lihat Konfigurasikan pustaka pihak ketiga.

    file_extend ditentukan dalam konfigurasi pipeline. Namun, plug-in logstash-output-file_extend belum diinstal.

    Gunakan salah satu solusi berikut:

    • Instal plug-in logstash-output-file_extend. Untuk informasi lebih lanjut, lihat Instal dan hapus plug-in.

    • Hapus parameter file_extend dari konfigurasi pipeline.

    Untuk informasi lebih lanjut tentang penyebab dan solusi untuk masalah ini, lihat FAQ tentang transfer data menggunakan Logstash.

  • P: Bagaimana cara mengonfigurasi beberapa koneksi JDBC ke database sumber dalam konfigurasi input pipeline?

    A: Anda dapat mendefinisikan beberapa sumber data JDBC dalam konfigurasi input pipeline dan menentukan pernyataan query untuk setiap tabel dalam parameter statement. Kode berikut memberikan contoh:

    input {
        jdbc {
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar"
          jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<Nama Database>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
          jdbc_user => "xxxxx"
          jdbc_password => "xxxx"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          statement => "select * from tableA where update_time >= :sql_last_value"
          schedule => "* * * * *"
          record_last_run => true
          last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
          clean_run => false
          tracking_column_type => "timestamp"
          use_column_value => true
          tracking_column => "update_time"
          type => "A"
        }
        jdbc {
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar"
          jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<Nama Database>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
          jdbc_user => "xxxxx"
          jdbc_password => "xxxx"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          statement => "select * from tableB where update_time >= :sql_last_value"
          schedule => "* * * * *"
          record_last_run => true
          last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
          clean_run => false
          tracking_column_type => "timestamp"
          use_column_value => true
          tracking_column => "update_time"
          type => "B"
        }
    }
    output {
        if[type] == "A" {
            elasticsearch {
                hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200"
                index => "rds_es_dxhtest_datetime_A"
                user => "elastic"
                password => "xxxxxxx"
                document_id => "%{id}"
            }
        }
        if[type] == "B" {
            elasticsearch {            
                hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200"
                index => "rds_es_dxhtest_datetime_B"
                user => "elastic"
                password => "xxxxxxx"
                document_id => "%{id}"
            }
        }
    }

    Dalam contoh sebelumnya, parameter type ditambahkan ke konfigurasi jdbc. Parameter ini membantu Anda mendefinisikan kondisi dalam konfigurasi output dan menyinkronkan data dari tabel yang berbeda ke indeks yang berbeda.