Anda dapat menggunakan pipeline Alibaba Cloud Logstash untuk menyinkronkan data dari Azure Event Hubs ke instans Alibaba Cloud Elasticsearch.
Prosedur
Langkah 1: Siapkan lingkungan dan instans
- Buat instans Alibaba Cloud Elasticsearch dan aktifkan pembuatan indeks otomatis. Contoh ini menggunakan instans versi 7.10.
Untuk informasi lebih lanjut, lihat Buat instans Alibaba Cloud Elasticsearch dan Konfigurasikan parameter YML.
- Buat instans Alibaba Cloud Logstash dan konfigurasikan NAT Gateway untuk akses jaringan publik. Contoh ini menggunakan instans versi 7.4.
Untuk informasi lebih lanjut, lihat Buat instans Alibaba Cloud Logstash.Instans Alibaba Cloud Logstash dideploy di VPC. Untuk memungkinkan Logstash berkomunikasi dengan Azure Event Hubs melalui internet publik, konfigurasikan NAT Gateway untuk VPC tersebut. Untuk informasi lebih lanjut, lihat Konfigurasikan akses jaringan publik menggunakan NAT Gateway.Catatan Untuk instalasi Logstash yang dikelola sendiri, beli instans ECS di VPC yang sama dengan instans Alibaba Cloud Elasticsearch Anda dan tetapkan Elastic IP Address ke instans tersebut. Anda tidak perlu membeli instans ECS baru jika sudah memiliki instans yang memenuhi persyaratan.
- Siapkan lingkungan Azure Event Hubs Anda.
Untuk informasi lebih lanjut, lihat dokumentasi resmi Azure Event Hubs.
Langkah 2: Buat dan konfigurasikan pipeline Logstash
Buka halaman Logstash Clusters.
Navigasikan ke kluster target.
Di bilah navigasi atas, pilih wilayah tempat kluster berada.
Di halaman Logstash Clusters, temukan kluster tersebut lalu klik ID-nya.
-
Di panel navigasi kiri, klik Pipelines.
-
Klik Create Pipeline.
- Di halaman Create, masukkan Pipeline ID dan konfigurasikan pipeline tersebut.
Konfigurasi pipeline berikut digunakan dalam contoh ini.
input { azure_event_hubs { event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"] initial_position => "beginning" threads => 2 decorate_events => true consumer_group => "group-kl" storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn" storage_container => "lettie_container" } } filter { } output { elasticsearch { hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"] index => "test-log" password => "xxxxxx" user => "elastic" } }Tabel 1. Parameter input Parameter Deskripsi event_hub_connections Daftar string koneksi untuk event hub yang akan dibaca. String koneksi mencakup EntityPathdari event hub. Untuk informasi lebih lanjut, lihat event_hub_connections.Catatan Setiap event hub memerlukan parameter event_hub_connections terpisah. Parameter lain berlaku untuk semua event hub.initial_position Posisi dalam event hub tempat pembacaan data dimulai. Nilai yang valid: beginning(default),end, danlook_back. Untuk informasi lebih lanjut, lihat initial position .threads Jumlah total thread untuk pemrosesan event. Untuk informasi lebih lanjut, lihat threads. decorate_events Menentukan apakah metadata event hub disertakan dalam data yang disinkronkan, seperti nama event hub, kelompok konsumen, host prosesor, partisi, offset, urutan, stempel waktu, dan ukuran event. Untuk informasi lebih lanjut, lihat decorate events. consumer_group Kelompok konsumen yang digunakan untuk membaca data dari event hub. Buat kelompok konsumen khusus untuk Logstash dan pastikan semua node Logstash menggunakannya untuk mengoordinasikan pekerjaan mereka. Untuk informasi lebih lanjut, lihat consumer group. storage_connection String koneksi untuk akun Azure Blob Storage. Penyimpanan ini menyimpan offset saat restart, sehingga beberapa node Logstash dapat memproses partisi yang berbeda. Jika parameter ini diatur, Logstash melanjutkan pemrosesan dari posisi terakhir sebelum restart. Jika tidak diatur, Logstash melanjutkan dari posisi yang ditentukan oleh initial_position. Untuk informasi lebih lanjut, lihat storage connection. storage_container Nama kontainer penyimpanan yang digunakan untuk menyimpan offset secara persisten dan mengoordinasikan pekerjaan di antara beberapa node Logstash. Untuk informasi lebih lanjut, lihat storage container. Catatan Untuk menghindari penimpaan offset, gunakan nama storage_container yang unik untuk setiap pipeline. Jika Anda menulis data yang sama ke beberapa layanan, tentukan nama yang berbeda untuk setiap pipeline.Tabel 2. Parameter output Parameter Deskripsi hosts Titik akhir instans Alibaba Cloud Elasticsearch Anda. Formatnya adalah http://<ID instans Alibaba Cloud Elasticsearch Anda>.elasticsearch.aliyuncs.com:9200.index Nama indeks tujuan. user Username untuk mengakses instans Elasticsearch. Nilai default: elastic.password Password untuk user yang ditentukan. Anda menetapkan password untuk user elasticsaat membuat instans Elasticsearch. Jika lupa password, Anda dapat mengaturnya ulang. Untuk informasi lebih lanjut, lihat Atur ulang password akses instans.Untuk informasi lebih lanjut tentang konfigurasi pipeline, lihat File Konfigurasi Logstash.
-
Klik Next step dan konfigurasikan parameter pipeline.
Parameter
Deskripsi
Pipeline Workers
Jumlah thread pekerja untuk tahap filter dan output. Tingkatkan nilai ini jika terjadi backlog event atau CPU kurang dimanfaatkan. Default: jumlah core CPU.
Pipeline Batch Size
Jumlah maksimum event yang dikumpulkan pekerja sebelum mengeksekusi filter dan output. Batch yang lebih besar meningkatkan penggunaan memori dan mungkin memerlukan ukuran heap JVM (LS_HEAP_SIZE) yang lebih besar. Default: 125.
Pipeline Batch Delay
Waktu tunggu dalam milidetik sebelum mengirimkan batch yang belum penuh ke thread pekerja. Default: 50 ms.
Queue Type
Model antrian internal untuk buffering event. Nilai yang valid:
-
MEMORY: Default. Menggunakan antrian dalam memori.
-
PERSISTED: Antrian persisten berbasis disk.
Queue Max Bytes
Jumlah maksimum data yang dapat disimpan oleh antrian, dalam
MB. Nilainya harus berupa bilangan bulat dari1hingga2<sup>53</sup>-1. Nilai default:1024.CatatanPastikan nilai ini lebih kecil dari kapasitas disk total Anda.
Queue Checkpoint Writes
Jumlah maksimum event yang ditulis sebelum checkpoint dipaksakan (hanya untuk antrian persisten). Nilai 0 berarti tanpa batas. Default: 1024.
PeringatanMenyimpan dan menerapkan konfigurasi akan memicu restart instans. Lanjutkan hanya jika hal ini tidak berdampak pada bisnis Anda.
-
-
Klik Save atau Save and Deploy.
-
Save: Menyimpan konfigurasi pipeline tetapi tidak menerapkannya. Setelah disimpan, Anda akan dikembalikan ke halaman Pipelines. Di bagian Pipelines, Anda dapat mengklik Deploy Now di kolom Actions untuk merestart instans dan menerapkan konfigurasi.
-
Save and Deploy: Menyimpan dan menerapkan konfigurasi, serta merestart instans untuk menerapkan perubahan.
-
Langkah 3: Verifikasi hasilnya
Masuk ke konsol Kibana kluster Elasticsearch Anda dan buka halaman utama Kibana.
Di menu navigasi kiri, klik Dev tools.
- Di Console, jalankan perintah berikut untuk memeriksa data yang telah disinkronkan.
GET test-log3/_search { "query":{ "match":{ "message":"L23" } } }Output yang diharapkan: