全部产品
Search
文档中心

Elasticsearch:Migrasi data dari Elasticsearch yang dikelola sendiri ke Alibaba Cloud Elasticsearch menggunakan Logstash

更新时间:Mar 01, 2026

Deploy instans Logstash yang dikelola sendiri pada instans Elastic Compute Service (ECS) untuk melakukan migrasi data penuh atau inkremental dari kluster Elasticsearch yang dikelola sendiri ke kluster Alibaba Cloud Elasticsearch.

Pilih metode migrasi yang tepat

MetodePaling Cocok untukTrade-off
LogstashSinkronisasi inkremental, replikasi berkelanjutan, dan migrasi indeks selektifMemerlukan instans Logstash yang aktif; menimbulkan overhead pemrosesan pipeline
SnapshotsMigrasi penuh satu kali untuk dataset besarMemerlukan penyimpanan bersama; hanya memulihkan indeks lengkap (tidak mendukung sinkronisasi inkremental tingkat dokumen)
Reindex APIDataset kecil atau migrasi lintas versiLebih lambat untuk volume besar; bergantung pada ketersediaan jaringan

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Kluster Elasticsearch yang dikelola sendiri (sumber) berisi data yang akan dimigrasikan

  • Kluster Alibaba Cloud Elasticsearch (tujuan) yang telah dibuat dan berjalan. Untuk informasi selengkapnya, lihat Buat kluster Alibaba Cloud Elasticsearch.

  • Instans ECS dalam Virtual Private Cloud (VPC) yang sama dengan kluster tujuan, dilengkapi alamat IPv4 publik

  • Logstash yang dikelola sendiri yang diinstal pada instans ECS

  • Konektivitas jaringan yang telah diverifikasi antara Logstash dan kedua kluster Elasticsearch

  • Aturan masuk security group yang mengizinkan lalu lintas pada port 5601 (untuk akses Kibana)

Penting

Instans ECS harus berada dalam VPC yang sama dengan kluster Alibaba Cloud Elasticsearch. Logstash harus dapat terhubung ke kedua kluster sumber dan tujuan.

Langkah 1: Siapkan lingkungan

Bagian ini menjelaskan cara men-deploy kluster Elasticsearch yang dikelola sendiri, Kibana, dan Logstash pada satu instans ECS. Jika komponen-komponen tersebut sudah berjalan, lanjutkan ke Langkah 2.

Spesifikasi lingkungan contoh

Tabel berikut menjelaskan konfigurasi contoh yang digunakan dalam dokumen ini.

Kluster Alibaba Cloud Elasticsearch

KonfigurasiNilai
RegionTiongkok (Hangzhou)
Edisi dan versiV7.10.0, Edisi Standar
Zona dan nodeTiga zona, tiga node data
Spesifikasi node tunggal4 vCPU, memori 16 GiB, Enhanced SSD (ESSD) dengan penyimpanan 100 GiB

Instans ECS

KonfigurasiNilai
RegionTiongkok (Hangzhou)
Spesifikasi4 vCPU, memori 16 GiB
ImageCentOS 7.9 64-bit (gambar publik)
Disk sistemESSD, 100 GiB
JaringanVPC yang sama dengan kluster Alibaba Cloud Elasticsearch. Assign Public IPv4 Address dipilih. Penagihan pay-by-traffic, bandwidth puncak 100 Mbit/s
Security groupAturan inbound yang mengizinkan traffic pada port 5601 (Kibana). Tambahkan alamat IP klien Anda sebagai objek otorisasi
Penting
  • Jika klien Anda berada di LAN rumah atau kantor, tambahkan alamat IP egress publik, bukan alamat IP lokal. Kunjungi cip.cc untuk menemukan IP publik Anda.

  • Menambahkan 0.0.0.0/0 sebagai objek otorisasi mengizinkan akses dari semua alamat IPv4 publik. Hindari hal ini di lingkungan produksi karena risiko keamanan.

Untuk informasi lebih lanjut tentang pembuatan instans ECS, lihat Buat instans di tab Custom Launch.

Buat kluster Alibaba Cloud Elasticsearch

Buat kluster dengan spesifikasi yang tercantum di atas atau sesuaikan dengan kebutuhan Anda. Untuk petunjuk detail, lihat Buat kluster Alibaba Cloud Elasticsearch.

Deploy Elasticsearch yang dikelola sendiri

Contoh ini mendeploy kluster Elasticsearch 7.6.2 single-node pada instans ECS.

  1. Terhubung ke instans ECS. Untuk informasi selengkapnya, lihat Terhubung ke Instans Linux Menggunakan Kata Sandi atau Kunci.

  2. Buat pengguna bernama elastic sebagai root user:

       useradd elastic
  3. Tetapkan password untuk pengguna elastic: Masukkan dan konfirmasi password saat diminta.

       passwd elastic
  4. Beralih ke pengguna elastic:

       su -l elastic
  5. Unduh dan ekstrak paket Elasticsearch:

       wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz
       tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gz
  6. Jalankan Elasticsearch:

       cd elasticsearch-7.6.2
       ./bin/elasticsearch -d
  7. Verifikasi bahwa Elasticsearch sedang berjalan: Respons yang berhasil berisi nomor versi dan pesan "You Know, for Search".

       cd ~
       curl localhost:9200

Deploy Kibana dan tambahkan data sampel

Contoh ini mendeploy Kibana 7.6.2 pada instans ECS yang sama.

Jalankan Kibana sebagai pengguna biasa, bukan root.
  1. Unduh dan ekstrak paket Kibana:

       wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz
       tar -zvxf kibana-7.6.2-linux-x86_64.tar.gz
  2. Konfigurasikan Kibana agar menerima koneksi dari semua alamat IP. Buka file kibana.yml di direktori config/ dan tambahkan baris berikut:

       cd kibana-7.6.2-linux-x86_64
       vi config/kibana.yml
       server.host: "0.0.0.0"
  3. Jalankan Kibana:

       sudo nohup ./bin/kibana &
  4. Buka konsol Kibana di browser Anda:

       http://<ECS-public-IP>:5601/app/kibana#/home
  5. Pada halaman utama Kibana, klik Try our sample data. Pada tab Sample data, klik Add data di kartu Sample web logs untuk memuat data uji.

Deploy Logstash

Contoh ini mendeploy Logstash 7.10.0 pada instans ECS yang sama.

Jalankan Logstash sebagai pengguna biasa, bukan root.
  1. Unduh dan ekstrak paket Logstash:

       cd ~
       wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
       tar -zvxf logstash-7.10.0-linux-x86_64.tar.gz
  2. Tingkatkan memori heap JVM. Ukuran heap default adalah 1 GiB. Tetapkan ke nilai yang sesuai dengan spesifikasi instans ECS Anda untuk mempercepat migrasi data. Misalnya, tetapkan ukuran heap menjadi 8 GiB untuk instans berkapasitas 16 GiB:

       cd logstash-7.10.0
       sudo vi config/jvm.options
       -Xms8g
       -Xmx8g
  3. Tingkatkan ukuran batch. Edit file konfigurasi pipelines.yml dan ubah pipeline.batch.size dari nilai default 125 menjadi 5000. Hal ini memungkinkan Logstash menulis 5 MiB hingga 15 MiB data per batch, yang mempercepat migrasi:

       vi config/pipelines.yml
  4. Verifikasi bahwa Logstash berjalan dengan benar: Ketik Hello world! dan tekan Enter. Jika Logstash berfungsi, ia akan mengembalikan output Hello world! ke terminal. Tekan Ctrl+C untuk menghentikan.

       bin/logstash -e 'input { stdin { } } output { stdout {} }'

Langkah 2: (Opsional) Migrasi metadata indeks

Saat Logstash melakukan migrasi data, fitur Auto Indexing pada kluster Alibaba Cloud Elasticsearch secara otomatis membuat indeks. Namun, indeks yang dibuat otomatis mungkin memiliki pengaturan dan pemetaan berbeda dari indeks sumber. Untuk mempertahankan struktur indeks asli, buat indeks tersebut secara manual pada kluster tujuan sebelum migrasi data.

Skrip Python berikut membaca pengaturan dan pemetaan indeks dari kluster sumber, lalu membuat indeks yang sesuai pada kluster tujuan.

  1. Hubungkan ke instans ECS sebagai pengguna biasa.

  2. Buat file skrip Python:

       sudo vi indiceCreate.py
  3. Tempel skrip berikut. Ganti nilai host, username, dan password untuk kluster sumber dan tujuan:Perilaku utama skrip ini:

    • Mempertahankan jumlah shard primary dari indeks sumber.

    • Menetapkan jumlah shard replika ke 0 (DEFAULT_REPLICAS = 0) untuk mempercepat pengindeksan awal. Tingkatkan jumlah replika setelah migrasi.

    • Melewati indeks sistem (nama yang diawali dengan .). Buat ulang secara manual jika diperlukan.

       #!/usr/bin/python
       # -*- coding: UTF-8 -*-
       # File name: indiceCreate.py
       import sys
       import base64
       import time
       import httplib
       import json
       ## Kluster Elasticsearch sumber
       oldClusterHost = "<source-es-host>:9200"
       ## Username kluster sumber (kosongkan jika tidak diperlukan)
       oldClusterUserName = "elastic"
       ## Password kluster sumber (kosongkan jika tidak diperlukan)
       oldClusterPassword = "<source-es-password>"
       ## Kluster Elasticsearch tujuan (temukan di halaman Basic Information)
       newClusterHost = "<destination-es-endpoint>:9200"
       ## Username kluster tujuan
       newClusterUser = "elastic"
       ## Password kluster tujuan
       newClusterPassword = "<destination-es-password>"
       DEFAULT_REPLICAS = 0
       def httpRequest(method, host, endpoint, params="", username="", password=""):
           conn = httplib.HTTPConnection(host)
           headers = {}
           if (username != "") :
               'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
               base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
               headers["Authorization"] = "Basic %s" % base64string;
           if "GET" == method:
               headers["Content-Type"] = "application/x-www-form-urlencoded"
               conn.request(method=method, url=endpoint, headers=headers)
           else :
               headers["Content-Type"] = "application/json"
               conn.request(method=method, url=endpoint, body=params, headers=headers)
           response = conn.getresponse()
           res = response.read()
           return res
       def httpGet(host, endpoint, username="", password=""):
           return httpRequest("GET", host, endpoint, "", username, password)
       def httpPost(host, endpoint, params, username="", password=""):
           return httpRequest("POST", host, endpoint, params, username, password)
       def httpPut(host, endpoint, params, username="", password=""):
           return httpRequest("PUT", host, endpoint, params, username, password)
       def getIndices(host, username="", password=""):
           endpoint = "/_cat/indices"
           indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
           indicesList = indicesResult.split("\n")
           indexList = []
           for indices in indicesList:
               if (indices.find("open") > 0):
                   indexList.append(indices.split()[2])
           return indexList
       def getSettings(index, host, username="", password=""):
           endpoint = "/" + index + "/_settings"
           indexSettings = httpGet(host, endpoint, username, password)
           print (index + "  Pengaturan asli: \n" + indexSettings)
           settingsDict = json.loads(indexSettings)
           ## Jumlah shard primary sesuai dengan indeks sumber
           number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
           ## Jumlah replika default adalah 0
           number_of_replicas = DEFAULT_REPLICAS
           newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
           return newSetting
       def getMapping(index, host, username="", password=""):
           endpoint = "/" + index + "/_mapping"
           indexMapping = httpGet(host, endpoint, username, password)
           print (index + " Pemetaan asli: \n" + indexMapping)
           mappingDict = json.loads(indexMapping)
           mappings = json.dumps(mappingDict[index]["mappings"])
           newMapping = "\"mappings\" : " + mappings
           return newMapping
       def createIndexStatement(oldIndexName):
           settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
           mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
           createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
           return createstatement
       def createIndex(oldIndexName, newIndexName=""):
           if (newIndexName == "") :
               newIndexName = oldIndexName
           createstatement = createIndexStatement(oldIndexName)
           print ("Indeks baru " + newIndexName + " Pengaturan dan pemetaan indeks: \n" + createstatement)
           endpoint = "/" + newIndexName
           createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
           print ("Indeks baru " + newIndexName + " Hasil pembuatan: " + createResult)
       ## main
       indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
       systemIndex = []
       for index in indexList:
           if (index.startswith(".")):
               systemIndex.append(index)
           else :
               createIndex(index, index)
       if (len(systemIndex) > 0) :
           for index in systemIndex:
               print (index + " Kemungkinan merupakan indeks sistem dan tidak akan dibuat ulang. Anda dapat membuat ulang indeks tersebut secara manual sesuai kebutuhan bisnis.")
  4. Jalankan skrip:

       sudo /usr/bin/python indiceCreate.py
  5. Verifikasi indeks yang telah dibuat. Masuk ke konsol Kibana kluster Alibaba Cloud Elasticsearch (lihat Masuk ke konsol Kibana) dan jalankan:

       GET /_cat/indices?v

Langkah 3: Migrasi data penuh

Lakukan migrasi data penuh terlebih dahulu jika terjadi penulisan atau pembaruan data secara terus-menerus pada kluster sumber. Hal ini menetapkan garis dasar lengkap pada kluster tujuan sebelum memulai sinkronisasi inkremental.

Untuk meningkatkan akurasi data, buat beberapa pipeline Logstash dan migrasi indeks yang berbeda secara terpisah.
Format konfigurasi Logstash berubah mulai versi 8.5. Bagian ini menyediakan konfigurasi untuk versi 7.x dan 8.x.
  1. Hubungkan ke instans ECS.

  2. Buat file konfigurasi Logstash:

       cd logstash-7.10.0/config
       vi es2es_all.conf
  3. Tambahkan konfigurasi sesuai versi Logstash Anda.

    Catatan
    • Parameter konfigurasi untuk Logstash 8.5 berbeda. Bagian ini menyediakan konfigurasi untuk versi 7.x dan 8.x.

    • Untuk meningkatkan akurasi data, buat beberapa pipeline Logstash dan migrasi indeks yang berbeda secara terpisah.

    Konfigurasi Logstash 7.x

       input{
           elasticsearch{
               # Host Elasticsearch sumber
               hosts =>  ["http://<source-es-host>:9200"]
               # Kredensial kluster sumber
               user => "<source-es-username>"
               password => "<source-es-password>"
               # Indeks yang akan dimigrasikan (mendukung wildcard)
               index => "kibana_sample_data_*"
               # Pengaturan performa
               docinfo=>true
               slices => 5
               size => 5000
           }
       }
    
       filter {
         # Hapus field yang ditambahkan oleh Logstash
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
    
       output{
           elasticsearch{
               # Titik akhir kluster tujuan (temukan di halaman Basic Information)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Kredensial kluster tujuan
               user => "elastic"
               password => "<destination-es-password>"
               # Pertahankan nama indeks asli
               index => "%{[@metadata][_index]}"
               # Pertahankan tipe dokumen asli (hanya untuk 7.x)
               document_type => "%{[@metadata][_type]}"
               # Pertahankan ID dokumen asli (hapus untuk performa lebih baik)
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }

    Konfigurasi Logstash 8.x

       input{
           elasticsearch{
               # Host Elasticsearch sumber
               hosts =>  ["http://<source-es-host>:9200"]
               # Kredensial kluster sumber
               user => "elastic"
               password => "<source-es-password>"
               # Indeks yang akan dimigrasikan
               index => "<index-name>"
               # Pengaturan performa
               docinfo => true
               size => 10000
               docinfo_target => "[@metadata]"
           }
       }
    
       filter {
         # Hapus field yang ditambahkan oleh Logstash
         mutate {
           remove_field => ["@timestamp","@version"]
         }
       }
    
       output{
           elasticsearch{
               # Titik akhir kluster tujuan (temukan di halaman Basic Information)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Kredensial kluster tujuan
               user => "elastic"
               password => "<destination-es-password>"
               # Pertahankan nama indeks asli
               index => "%{[@metadata][_index]}"
               # Pertahankan ID dokumen asli (hapus untuk performa lebih baik)
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }

    Cegah data duplikat: Plugin input Elasticsearch membaca semua data lalu menghentikan proses Logstash. Logstash secara otomatis me-restart proses tersebut, yang dapat menyebabkan penulisan duplikat dengan pipeline tunggal. Untuk mencegah hal ini, gunakan parameter schedule dengan ekspresi cron untuk menjalankan pipeline pada waktu tertentu. Misalnya, untuk menjalankan pipeline sekali pada pukul 13.20 tanggal 5 Maret. Untuk informasi lebih lanjut tentang sintaksis cron, lihat Scheduling dalam dokumentasi Logstash.

    Pada Logstash 8.x, pengaturan document_type dihapus dan docinfo_target => "[@metadata]" wajib digunakan.
       schedule => "20 13 5 3 *"
  4. Arahkan ke direktori instalasi Logstash:

       cd ~/logstash-7.10.0/config
       vi es2es_kibana_sample_data_logs.conf
  5. Jalankan migrasi:

       nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &

Langkah 4: Migrasi data inkremental

Setelah migrasi data penuh selesai, atur sinkronisasi inkremental untuk menangkap perubahan yang sedang berlangsung. Pipeline inkremental menggunakan kueri rentang waktu dan jadwal cron guna mengambil data baru dari kluster sumber secara berkala.

Jalankan langkah ini sebagai pengguna biasa.
  1. Buat file konfigurasi migrasi inkremental:

       cd ~/logstash-7.10.0/config
       vi es2es_kibana_sample_data_logs.conf
  2. Tambahkan konfigurasi berikut. Contoh ini mengkueri data yang diperbarui dalam 5 menit terakhir dan dijalankan setiap menit. Konfigurasi Logstash 7.x:

    Untuk Logstash 8.x, hapus pengaturan document_type => "%{[@metadata][_type]}".
    Penting

    - Timestamp Logstash dalam UTC. Jika zona waktu lokal Anda UTC+8, sesuaikan konversi timestamp tersebut. Filter rentang @timestamp dalam parameter query menggunakan UTC. - Jika indeks sumber tidak berisi field waktu, gunakan ingest pipeline dengan parameter _ingest.timestamp untuk menambahkannya.

       input{
           elasticsearch{
               # Host Elasticsearch sumber
               hosts =>  ["http://<source-es-host>:9200"]
               # Kredensial kluster sumber
               user => "<source-es-username>"
               password => "<source-es-password>"
               # Indeks yang akan disinkronkan
               index => "kibana_sample_data_logs"
               # Kueri untuk data inkremental (5 menit terakhir)
               query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
               # Jalankan setiap menit
               schedule => "* * * * *"
               scroll => "5m"
               docinfo=>true
               size => 5000
           }
       }
    
       filter {
         # Hapus field yang ditambahkan oleh Logstash
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
    
       output{
           elasticsearch{
               # Titik akhir kluster tujuan (temukan di halaman Basic Information)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Kredensial kluster tujuan
               user => "elastic"
               password => "<destination-es-password>"
               # Pertahankan nama indeks asli
               index => "%{[@metadata][_index]}"
               # Pertahankan tipe dokumen asli (hapus untuk 8.x)
               document_type => "%{[@metadata][_type]}"
               # Pertahankan ID dokumen asli
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }
  3. Arahkan ke direktori instalasi Logstash:

       cd ~/logstash-7.10.0
  4. Jalankan migrasi inkremental:

       sudo nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &
  5. Verifikasi data inkremental di kluster Alibaba Cloud Elasticsearch. Di konsol Kibana, kueri catatan yang baru diperbarui:

       GET kibana_sample_data_logs/_search
       {
         "query": {
           "range": {
             "@timestamp": {
               "gte": "now-5m",
               "lte": "now/m"
             }
           }
         },
         "sort": [
           {
             "@timestamp": {
               "order": "desc"
             }
           }
         ]
       }

Langkah 5: Verifikasi hasil migrasi

Verifikasi migrasi data penuh

  1. Di konsol Kibana kluster sumber, periksa jumlah dokumen pada indeks:

       GET _cat/indices?v
  2. Di konsol Kibana kluster Alibaba Cloud Elasticsearch, jalankan kueri yang sama:

       GET _cat/indices?v
  3. Bandingkan nilai docs.count untuk setiap indeks. Jika migrasi data penuh berhasil, jumlah dokumen pada kluster sumber dan tujuan harus sama.

Verifikasi migrasi data inkremental

  1. Di konsol Kibana kluster sumber, kueri catatan yang paling baru diperbarui:

       GET kibana_sample_data_logs/_search
       {
         "query": {
           "range": {
             "@timestamp": {
               "gte": "now-5m",
               "lte": "now/m"
             }
           }
         },
         "sort": [
           {
             "@timestamp": {
               "order": "desc"
             }
           }
         ]
       }
  2. Jalankan kueri yang sama di konsol Kibana kluster Alibaba Cloud Elasticsearch. Jika migrasi data inkremental berhasil, catatan terbaru pada kluster sumber dan tujuan harus sama.

Tuning performa

Sesuaikan parameter berikut untuk mengoptimalkan throughput migrasi:

ParameterLokasiDefaultDisarankanEfek
pipeline.batch.sizeconfig/pipelines.yml1255000Jumlah event per batch. Nilai yang lebih besar meningkatkan throughput tetapi menggunakan lebih banyak memori
-Xms / -Xmxconfig/jvm.options1g50% dari memori yang tersedia (misalnya, 8g untuk instans 16 GiB)Ukuran heap JVM. Heap yang lebih besar mendukung batch yang lebih besar
sizeKonfigurasi pipeline (input)10005000--10000Dokumen per permintaan scroll dari kluster sumber
slicesKonfigurasi pipeline (input)none (disabled)5Slice scroll paralel. Atur untuk mengaktifkan sliced scrolling guna meningkatkan paralelisme baca
scrollKonfigurasi pipeline (input)1m5mTimeout konteks scroll. Tingkatkan untuk jaringan lambat atau indeks besar
Tetapkan -Xms dan -Xmx ke nilai yang sama guna menghindari jeda akibat garbage collection JVM. Jangan melebihi 50% dari total memori sistem.

Keamanan kredensial

Hindari menyematkan kredensial secara langsung dalam file konfigurasi Logstash. Gunakan salah satu alternatif berikut:

  • Keystore Logstash: Simpan nilai sensitif di keystore Logstash dan rujuk dalam file konfigurasi menggunakan sintaks ${KEY_NAME}.

  • Variabel lingkungan: Ekspor kredensial sebagai variabel lingkungan dan rujuk dalam file konfigurasi.

Untuk informasi lebih lanjut, lihat Secrets keystore dalam dokumentasi Logstash.

Pemecahan masalah

Logstash tidak dapat terhubung ke kluster sumber

Gejala: Error connection refused atau timeout di log Logstash.

Solusi:

  1. Verifikasi bahwa host dan port Elasticsearch sumber sudah benar.

  2. Konfirmasi bahwa security group ECS mengizinkan lalu lintas keluar ke kluster sumber.

  3. Uji konektivitas dengan curl <source-es-host>:9200 dari instans ECS.

Logstash tidak dapat terhubung ke kluster tujuan

Gejala: Kegagalan autentikasi atau timeout koneksi saat menulis ke kluster Alibaba Cloud Elasticsearch.

Solusi:

  1. Verifikasi titik akhir kluster tujuan di halaman Basic Information pada konsol Alibaba Cloud Elasticsearch.

  2. Konfirmasi bahwa instans ECS berada dalam VPC yang sama dengan kluster tujuan.

  3. Periksa apakah username dan password sudah benar.

Jumlah dokumen tidak cocok setelah migrasi

Gejala: Kluster tujuan memiliki lebih sedikit dokumen daripada kluster sumber.

Solusi:

  1. Periksa log Logstash untuk error: tail -f logs/logstash-plain.log.

  2. Verifikasi bahwa nama indeks dalam konfigurasi pipeline sesuai dengan indeks sumber.

  3. Jika menggunakan pola wildcard, konfirmasi bahwa pola tersebut mencakup semua indeks yang dimaksud.

  4. Jalankan ulang pipeline migrasi penuh. Logstash menggunakan ID dokumen untuk mencegah duplikat saat document_id diatur.

Data duplikat setelah migrasi penuh

Gejala: Kluster tujuan memiliki lebih banyak dokumen daripada kluster sumber.

Solusi: Hal ini terjadi ketika Logstash secara otomatis me-restart pipeline setelah selesai. Gunakan parameter schedule dengan ekspresi cron untuk menjalankan pipeline hanya sekali (misalnya, schedule => "20 13 5 3 *").

Ketidaksesuaian zona waktu dalam migrasi inkremental

Gejala: Kueri inkremental mengembalikan hasil yang tidak diharapkan atau melewatkan data terbaru.

Solusi: Timestamp Logstash menggunakan UTC. Jika data sumber menggunakan zona waktu berbeda (misalnya, UTC+8), sesuaikan rentang @timestamp dalam parameter query sesuai.

Konflik pemetaan indeks

Gejala: Error tentang konflik tipe field saat mengindeks data di kluster tujuan.

Solusi:

  1. Jalankan skrip migrasi metadata indeks (Langkah 2) sebelum migrasi data.

  2. Verifikasi bahwa pemetaan di kluster tujuan sesuai dengan sumber: GET /<index-name>/_mapping.

  3. Jika konflik tetap ada, hapus indeks yang dibuat otomatis di kluster tujuan dan buat ulang dengan pemetaan yang benar.

Daftar periksa pasca-migrasi

Setelah migrasi selesai, verifikasi hal-hal berikut:

  • Jumlah dokumen pada sumber dan tujuan cocok untuk semua indeks yang dimigrasikan.

  • Konten dokumen sampel identik (periksa beberapa dokumen secara acak).

  • Pemetaan indeks di kluster tujuan sudah benar.

  • Tidak ada error di log Logstash (logs/logstash-plain.log).

  • Titik akhir aplikasi telah diperbarui untuk mengarah ke kluster Alibaba Cloud Elasticsearch.

  • Konektivitas aplikasi ke kluster baru telah diverifikasi.

  • Pipeline sinkronisasi inkremental dihentikan setelah cutover.

  • Pipeline Logstash dibersihkan pada instance ECS.

  • Kluster tujuan dipantau untuk mendeteksi error dan memantau performa.

Referensi