All Products
Search
Document Center

Elasticsearch:Sinkronkan Azure Event Hubs ke Alibaba Cloud Elasticsearch dengan Logstash

Last Updated:Jun 23, 2026

Anda dapat menggunakan pipeline Alibaba Cloud Logstash untuk menyinkronkan data dari Azure Event Hubs ke instans Alibaba Cloud Elasticsearch.

Prosedur

  1. Langkah 1: Siapkan lingkungan dan instans
  2. Langkah 2: Buat dan konfigurasikan pipeline Logstash
  3. Langkah 3: Verifikasi hasilnya

Langkah 1: Siapkan lingkungan dan instans

  1. Buat instans Alibaba Cloud Elasticsearch dan aktifkan pembuatan indeks otomatis. Contoh ini menggunakan instans versi 7.10.
  2. Buat instans Alibaba Cloud Logstash dan konfigurasikan NAT Gateway untuk akses jaringan publik. Contoh ini menggunakan instans versi 7.4.
    Untuk informasi lebih lanjut, lihat Buat instans Alibaba Cloud Logstash.
    Instans Alibaba Cloud Logstash dideploy di VPC. Untuk memungkinkan Logstash berkomunikasi dengan Azure Event Hubs melalui internet publik, konfigurasikan NAT Gateway untuk VPC tersebut. Untuk informasi lebih lanjut, lihat Konfigurasikan akses jaringan publik menggunakan NAT Gateway.
    Catatan Untuk instalasi Logstash yang dikelola sendiri, beli instans ECS di VPC yang sama dengan instans Alibaba Cloud Elasticsearch Anda dan tetapkan Elastic IP Address ke instans tersebut. Anda tidak perlu membeli instans ECS baru jika sudah memiliki instans yang memenuhi persyaratan.
  3. Siapkan lingkungan Azure Event Hubs Anda.
    Untuk informasi lebih lanjut, lihat dokumentasi resmi Azure Event Hubs.

Langkah 2: Buat dan konfigurasikan pipeline Logstash

  1. Buka halaman Logstash Clusters.

  2. Navigasikan ke kluster target.

    1. Di bilah navigasi atas, pilih wilayah tempat kluster berada.

    2. Di halaman Logstash Clusters, temukan kluster tersebut lalu klik ID-nya.

  3. Di panel navigasi kiri, klik Pipelines.

  4. Klik Create Pipeline.

  5. Di halaman Create, masukkan Pipeline ID dan konfigurasikan pipeline tersebut.
    Konfigurasi pipeline berikut digunakan dalam contoh ini.
    input {
      azure_event_hubs {
         event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"]
         initial_position => "beginning"
         threads => 2
         decorate_events => true
         consumer_group => "group-kl"
         storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn"
         storage_container => "lettie_container"
       }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"]
        index => "test-log"
        password => "xxxxxx"
        user => "elastic"
      }
    }
    Tabel 1. Parameter input
    Parameter Deskripsi
    event_hub_connections Daftar string koneksi untuk event hub yang akan dibaca. String koneksi mencakup EntityPath dari event hub. Untuk informasi lebih lanjut, lihat event_hub_connections.
    Catatan Setiap event hub memerlukan parameter event_hub_connections terpisah. Parameter lain berlaku untuk semua event hub.
    initial_position Posisi dalam event hub tempat pembacaan data dimulai. Nilai yang valid: beginning (default), end, dan look_back. Untuk informasi lebih lanjut, lihat initial position .
    threads Jumlah total thread untuk pemrosesan event. Untuk informasi lebih lanjut, lihat threads.
    decorate_events Menentukan apakah metadata event hub disertakan dalam data yang disinkronkan, seperti nama event hub, kelompok konsumen, host prosesor, partisi, offset, urutan, stempel waktu, dan ukuran event. Untuk informasi lebih lanjut, lihat decorate events.
    consumer_group Kelompok konsumen yang digunakan untuk membaca data dari event hub. Buat kelompok konsumen khusus untuk Logstash dan pastikan semua node Logstash menggunakannya untuk mengoordinasikan pekerjaan mereka. Untuk informasi lebih lanjut, lihat consumer group.
    storage_connection String koneksi untuk akun Azure Blob Storage. Penyimpanan ini menyimpan offset saat restart, sehingga beberapa node Logstash dapat memproses partisi yang berbeda. Jika parameter ini diatur, Logstash melanjutkan pemrosesan dari posisi terakhir sebelum restart. Jika tidak diatur, Logstash melanjutkan dari posisi yang ditentukan oleh initial_position. Untuk informasi lebih lanjut, lihat storage connection.
    storage_container Nama kontainer penyimpanan yang digunakan untuk menyimpan offset secara persisten dan mengoordinasikan pekerjaan di antara beberapa node Logstash. Untuk informasi lebih lanjut, lihat storage container.
    Catatan Untuk menghindari penimpaan offset, gunakan nama storage_container yang unik untuk setiap pipeline. Jika Anda menulis data yang sama ke beberapa layanan, tentukan nama yang berbeda untuk setiap pipeline.
    Tabel 2. Parameter output
    Parameter Deskripsi
    hosts Titik akhir instans Alibaba Cloud Elasticsearch Anda. Formatnya adalah http://<ID instans Alibaba Cloud Elasticsearch Anda>.elasticsearch.aliyuncs.com:9200.
    index Nama indeks tujuan.
    user Username untuk mengakses instans Elasticsearch. Nilai default: elastic.
    password Password untuk user yang ditentukan. Anda menetapkan password untuk user elastic saat membuat instans Elasticsearch. Jika lupa password, Anda dapat mengaturnya ulang. Untuk informasi lebih lanjut, lihat Atur ulang password akses instans.

    Untuk informasi lebih lanjut tentang konfigurasi pipeline, lihat File Konfigurasi Logstash.

  6. Klik Next step dan konfigurasikan parameter pipeline.

    Parameter

    Deskripsi

    Pipeline Workers

    Jumlah thread pekerja untuk tahap filter dan output. Tingkatkan nilai ini jika terjadi backlog event atau CPU kurang dimanfaatkan. Default: jumlah core CPU.

    Pipeline Batch Size

    Jumlah maksimum event yang dikumpulkan pekerja sebelum mengeksekusi filter dan output. Batch yang lebih besar meningkatkan penggunaan memori dan mungkin memerlukan ukuran heap JVM (LS_HEAP_SIZE) yang lebih besar. Default: 125.

    Pipeline Batch Delay

    Waktu tunggu dalam milidetik sebelum mengirimkan batch yang belum penuh ke thread pekerja. Default: 50 ms.

    Queue Type

    Model antrian internal untuk buffering event. Nilai yang valid:

    • MEMORY: Default. Menggunakan antrian dalam memori.

    • PERSISTED: Antrian persisten berbasis disk.

    Queue Max Bytes

    Jumlah maksimum data yang dapat disimpan oleh antrian, dalam MB. Nilainya harus berupa bilangan bulat dari 1 hingga 2<sup>53</sup>-1. Nilai default: 1024.

    Catatan

    Pastikan nilai ini lebih kecil dari kapasitas disk total Anda.

    Queue Checkpoint Writes

    Jumlah maksimum event yang ditulis sebelum checkpoint dipaksakan (hanya untuk antrian persisten). Nilai 0 berarti tanpa batas. Default: 1024.

    Peringatan

    Menyimpan dan menerapkan konfigurasi akan memicu restart instans. Lanjutkan hanya jika hal ini tidak berdampak pada bisnis Anda.

  7. Klik Save atau Save and Deploy.

    • Save: Menyimpan konfigurasi pipeline tetapi tidak menerapkannya. Setelah disimpan, Anda akan dikembalikan ke halaman Pipelines. Di bagian Pipelines, Anda dapat mengklik Deploy Now di kolom Actions untuk merestart instans dan menerapkan konfigurasi.

    • Save and Deploy: Menyimpan dan menerapkan konfigurasi, serta merestart instans untuk menerapkan perubahan.

Langkah 3: Verifikasi hasilnya

  1. Masuk ke konsol Kibana kluster Elasticsearch Anda dan buka halaman utama Kibana.

  2. Di menu navigasi kiri, klik Dev tools.

  3. Di Console, jalankan perintah berikut untuk memeriksa data yang telah disinkronkan.
    GET test-log3/_search
    {
      "query":{
        "match":{
          "message":"L23"
         }
       }
    }
    Output yang diharapkan:预期结果