全部产品
Search
文档中心

Elasticsearch:Sinkronisasi data dari Azure Event Hubs ke Alibaba Cloud Elasticsearch melalui Logstash

更新时间:Mar 03, 2026

Panduan ini menjelaskan cara menggunakan Alibaba Cloud Logstash untuk mengonsumsi data dari Azure Event Hubs dan menyinkronkannya ke kluster Elasticsearch.

Prasyarat

Langkah 1: Buat dan konfigurasikan pipeline Logstash

  1. Buka halaman Logstash Clusters.

  2. Arahkan ke kluster target.

    1. Pada bilah navigasi atas, pilih wilayah tempat kluster berada.

    2. Pada halaman Logstash Clusters, temukan kluster tersebut lalu klik ID-nya.

  3. Pada menu navigasi kiri, klik Pipelines.

  4. Klik Create Pipeline.

  5. Pada halaman Create, masukkan ID pipeline dan tempel kode berikut ke dalam Config Settings:

    input {
      azure_event_hubs {
         event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"]
         initial_position => "beginning"
         threads => 2
         decorate_events => true
         consumer_group => "group-kl"
         storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn"
         storage_container => "lettie_container"
       }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"]
        index => "test-log"
        password => "xxxxxx"
        user => "elastic"
      }
    }

    Parameter

    Kategori

    Parameter

    Deskripsi

    Azure

    event_hub_connections

    String koneksi untuk hub Anda. Termasuk EntityPath (nama Hub). Untuk informasi lebih lanjut, lihat event_hub_connections.

    Catatan

    Parameter event_hub_connections didefinisikan untuk setiap event hub. Parameter lainnya digunakan bersama oleh semua event hub.

    initial_position

    Posisi awal pembacaan data di event hub. Nilai yang valid: beginning (default), end, dan look_back. Untuk informasi lebih lanjut, lihat initial_position.

    threads

    Jumlah total thread untuk pemrosesan event. Untuk informasi lebih lanjut, lihat threads.

    decorate_events

    Menentukan apakah metadata event hub akan disinkronkan. Metadata mencakup nama event hub, consumer_group, processor_host, partition, offset, sequence, timestamp, dan event_size. Untuk informasi lebih lanjut, lihat decorate_events.

    consumer_group

    Gunakan kelompok khusus untuk Logstash. Beberapa node Logstash dalam kelompok ini akan berbagi beban. Untuk informasi lebih lanjut, lihat consumer_group.

    storage_connection

    String koneksi untuk Azure Blob Storage. Ini menyimpan offset sehingga Logstash dapat melanjutkan dari posisi terakhir setelah restart. Untuk informasi lebih lanjut, lihat storage_connection.

    storage_container

    Nama container penyimpanan yang digunakan untuk menyimpan offset dan memungkinkan beberapa node Logstash bekerja bersama. Untuk informasi lebih lanjut, lihat storage_container.

    Catatan

    Untuk menghindari penimpaan offset, gunakan nama storage_container yang berbeda. Jika data yang sama ditulis ke layanan berbeda, Anda harus mengatur parameter ini dengan nama yang berbeda.

    Elasticsearch

    hosts

    Titik akhir Elasticsearch Anda. Atur nilainya menjadi http://<ID instans Alibaba Cloud Elasticsearch>.elasticsearch.aliyuncs.com:9200.

    index

    Nama indeks tujuan di Elasticsearch.

    user

    Username untuk mengakses Elasticsearch. Default: elastic.

    password

    Password untuk pengguna Elasticsearch.

    Untuk informasi lebih lanjut, lihat File konfigurasi Logstash.

  6. Klik Next dan konfigurasikan parameter pipeline.

    管道参数配置

    Parameter

    Deskripsi

    Pipeline Workers

    Jumlah thread pekerja untuk menjalankan tahap filter dan output pipeline secara paralel. Jika terjadi backlog event atau CPU belum jenuh, pertimbangkan untuk menambah jumlah thread agar lebih memanfaatkan daya pemrosesan CPU. Nilai default: Jumlah core CPU pada instans.

    Pipeline Batch Size

    Jumlah maksimum event yang dapat dikumpulkan oleh satu thread pekerja dari input sebelum mencoba menjalankan filter dan output. Ukuran batch yang lebih besar dapat menyebabkan overhead memori yang lebih tinggi. Anda dapat meningkatkan ukuran heap JVM dengan mengatur variabel LS_HEAP_SIZE agar nilai ini efektif. Nilai default: 125.

    Pipeline Batch Delay

    Durasi dalam milidetik untuk menunggu setiap event sebelum mengirimkan batch kecil ke thread pekerja pipeline. Nilai default: 50 ms.

    Queue Type

    Model antrian internal untuk buffering event. Nilai yang valid:

    • MEMORY: Default. Antrian tradisional berbasis memori.

    • PERSISTED: Antrian berbasis disk dengan ACK (antrian persisten).

    Queue Max Bytes

    Jumlah maksimum data yang dapat disimpan oleh antrian, dalam MB. Nilainya harus berupa bilangan bulat dari 1 hingga 2<sup>53</sup>-1. Nilai default adalah 1024 MB.

    Catatan

    Pastikan nilai ini lebih kecil dari kapasitas disk total.

    Queue Checkpoint Writes

    Jika antrian persisten diaktifkan, ini adalah jumlah maksimum event yang dapat ditulis sebelum checkpoint dipaksakan. Nilai 0 berarti tidak ada batas. Nilai default: 1024.

    Peringatan

    Men-deploy atau memperbarui pipeline akan memicu restart kluster Logstash. Pastikan hal ini sesuai dengan jendela pemeliharaan Anda.

  7. Klik Save atau Save and Deploy.

    • Save: Menyimpan konfigurasi pipeline di Logstash. Konfigurasi tidak berlaku hingga di-deploy. Setelah disimpan, Anda akan kembali ke halaman Pipelines. Klik Deploy Now pada kolom Actions untuk me-restart instans dan menerapkan konfigurasi.

    • Save and Deploy: Menyimpan dan menerapkan konfigurasi. Ini akan me-restart instans dan menerapkan konfigurasi.

Langkah 3: Verifikasi sinkronisasi data

  1. Masuk ke konsol Kibana kluster Elasticsearch Anda dan buka halaman utama Kibana.

  2. Pada menu navigasi kiri, klik Dev tools.

  3. Di Console, jalankan perintah berikut untuk memverifikasi aliran data:

    GET test-log3/_search
    {
      "query":{
        "match":{
          "message":"L23"
         }
       }
    }

    Hasil yang diharapkan:预期结果