Panduan ini menjelaskan cara menggunakan Alibaba Cloud Logstash untuk mengonsumsi data dari Azure Event Hubs dan menyinkronkannya ke kluster Elasticsearch.
Prasyarat
Kluster Alibaba Cloud Elasticsearch telah dibuat.
Contoh ini menggunakan versi 7.10.
Auto indexing diaktifkan dalam pengaturan YML kluster Elasticsearch.
Pengaturan ini memungkinkan Logstash membuat indeks tujuan secara otomatis. Lihat Konfigurasi parameter YML.
Instans Alibaba Cloud Logstash telah dibuat.
Contoh ini menggunakan versi 7.4.
Gateway NAT dengan aturan SNAT telah dikonfigurasi untuk VPC Logstash.
Pengaturan ini memungkinkan Logstash (di dalam VPC) berkomunikasi dengan Azure Event Hubs (di jaringan publik). Lihat Konfigurasi Gateway NAT untuk transmisi data Internet.
Lingkungan Azure Event Hubs telah disiapkan:
Namespace dan instans Event Hub yang aktif.
Connection String (Primary Key) serta akun Blob Storage untuk pelacakan offset (checkpointing).
Langkah 1: Buat dan konfigurasikan pipeline Logstash
Buka halaman Logstash Clusters.
Arahkan ke kluster target.
Pada bilah navigasi atas, pilih wilayah tempat kluster berada.
Pada halaman Logstash Clusters, temukan kluster tersebut lalu klik ID-nya.
Pada menu navigasi kiri, klik Pipelines.
Klik Create Pipeline.
Pada halaman Create, masukkan ID pipeline dan tempel kode berikut ke dalam Config Settings:
input { azure_event_hubs { event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"] initial_position => "beginning" threads => 2 decorate_events => true consumer_group => "group-kl" storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn" storage_container => "lettie_container" } } filter { } output { elasticsearch { hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"] index => "test-log" password => "xxxxxx" user => "elastic" } }Parameter
Kategori
Parameter
Deskripsi
Azure
event_hub_connections
String koneksi untuk hub Anda. Termasuk
EntityPath(nama Hub). Untuk informasi lebih lanjut, lihat event_hub_connections.CatatanParameter event_hub_connections didefinisikan untuk setiap event hub. Parameter lainnya digunakan bersama oleh semua event hub.
initial_position
Posisi awal pembacaan data di event hub. Nilai yang valid: beginning (default), end, dan look_back. Untuk informasi lebih lanjut, lihat initial_position.
threads
Jumlah total thread untuk pemrosesan event. Untuk informasi lebih lanjut, lihat threads.
decorate_events
Menentukan apakah metadata event hub akan disinkronkan. Metadata mencakup nama event hub, consumer_group, processor_host, partition, offset, sequence, timestamp, dan event_size. Untuk informasi lebih lanjut, lihat decorate_events.
consumer_group
Gunakan kelompok khusus untuk Logstash. Beberapa node Logstash dalam kelompok ini akan berbagi beban. Untuk informasi lebih lanjut, lihat consumer_group.
storage_connection
String koneksi untuk Azure Blob Storage. Ini menyimpan offset sehingga Logstash dapat melanjutkan dari posisi terakhir setelah restart. Untuk informasi lebih lanjut, lihat storage_connection.
storage_container
Nama container penyimpanan yang digunakan untuk menyimpan offset dan memungkinkan beberapa node Logstash bekerja bersama. Untuk informasi lebih lanjut, lihat storage_container.
CatatanUntuk menghindari penimpaan offset, gunakan nama storage_container yang berbeda. Jika data yang sama ditulis ke layanan berbeda, Anda harus mengatur parameter ini dengan nama yang berbeda.
Elasticsearch
hosts
Titik akhir Elasticsearch Anda. Atur nilainya menjadi
http://<ID instans Alibaba Cloud Elasticsearch>.elasticsearch.aliyuncs.com:9200.index
Nama indeks tujuan di Elasticsearch.
user
Username untuk mengakses Elasticsearch. Default:
elastic.password
Password untuk pengguna Elasticsearch.
Untuk informasi lebih lanjut, lihat File konfigurasi Logstash.
Klik Next dan konfigurasikan parameter pipeline.

Parameter
Deskripsi
Pipeline Workers
Jumlah thread pekerja untuk menjalankan tahap filter dan output pipeline secara paralel. Jika terjadi backlog event atau CPU belum jenuh, pertimbangkan untuk menambah jumlah thread agar lebih memanfaatkan daya pemrosesan CPU. Nilai default: Jumlah core CPU pada instans.
Pipeline Batch Size
Jumlah maksimum event yang dapat dikumpulkan oleh satu thread pekerja dari input sebelum mencoba menjalankan filter dan output. Ukuran batch yang lebih besar dapat menyebabkan overhead memori yang lebih tinggi. Anda dapat meningkatkan ukuran heap JVM dengan mengatur variabel LS_HEAP_SIZE agar nilai ini efektif. Nilai default: 125.
Pipeline Batch Delay
Durasi dalam milidetik untuk menunggu setiap event sebelum mengirimkan batch kecil ke thread pekerja pipeline. Nilai default: 50 ms.
Queue Type
Model antrian internal untuk buffering event. Nilai yang valid:
MEMORY: Default. Antrian tradisional berbasis memori.
PERSISTED: Antrian berbasis disk dengan ACK (antrian persisten).
Queue Max Bytes
Jumlah maksimum data yang dapat disimpan oleh antrian, dalam
MB. Nilainya harus berupa bilangan bulat dari1hingga2<sup>53</sup>-1. Nilai default adalah1024 MB.CatatanPastikan nilai ini lebih kecil dari kapasitas disk total.
Queue Checkpoint Writes
Jika antrian persisten diaktifkan, ini adalah jumlah maksimum event yang dapat ditulis sebelum checkpoint dipaksakan. Nilai 0 berarti tidak ada batas. Nilai default: 1024.
PeringatanMen-deploy atau memperbarui pipeline akan memicu restart kluster Logstash. Pastikan hal ini sesuai dengan jendela pemeliharaan Anda.
Klik Save atau Save and Deploy.
Save: Menyimpan konfigurasi pipeline di Logstash. Konfigurasi tidak berlaku hingga di-deploy. Setelah disimpan, Anda akan kembali ke halaman Pipelines. Klik Deploy Now pada kolom Actions untuk me-restart instans dan menerapkan konfigurasi.
Save and Deploy: Menyimpan dan menerapkan konfigurasi. Ini akan me-restart instans dan menerapkan konfigurasi.
Langkah 3: Verifikasi sinkronisasi data
Masuk ke konsol Kibana kluster Elasticsearch Anda dan buka halaman utama Kibana.
Pada menu navigasi kiri, klik Dev tools.
Di Console, jalankan perintah berikut untuk memverifikasi aliran data:
GET test-log3/_search { "query":{ "match":{ "message":"L23" } } }Hasil yang diharapkan:
