Logstash memproses data melalui pipeline yang terdiri atas input, filter, dan output. Dengan menghubungkan instans ApsaraMQ for Kafka sebagai input, Logstash mengonsumsi pesan dari topik Kafka dan meneruskannya ke tujuan seperti Elasticsearch, penyimpanan file, atau stdout.
Arsitektur ini memberikan dua keuntungan:
Pemrosesan asinkron: Kafka menyimpan sementara pesan sehingga Logstash dapat memprosesnya sesuai kecepatannya sendiri, mencegah kehilangan data selama lonjakan lalu lintas.
Decoupling: Jika sistem downstream seperti Elasticsearch mati, Kafka tetap menyimpan pesan hingga sistem tersebut pulih. Produsen upstream tidak terpengaruh.
Topik ini menjelaskan cara mengonfigurasi Logstash untuk mengonsumsi pesan dari instans ApsaraMQ for Kafka melalui koneksi virtual private cloud (VPC).
Prasyarat
Sebelum memulai, pastikan Anda telah:
Membeli dan men-deploy instans ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Purchase and deploy a VPC-connected instance.
Logstash telah terinstal. Untuk informasi selengkapnya, lihat Unduh Logstash.
Menginstal Java Development Kit (JDK) 8. Untuk informasi lebih lanjut, lihat halaman unduhan Java 8.
Memastikan plugin input Kafka untuk Logstash tersedia. Jalankan perintah berikut untuk memverifikasi. Jika plugin tidak terdaftar, instal plugin tersebut:
bin/logstash-plugin list | grep logstash-input-kafkabin/logstash-plugin install logstash-input-kafka
Langkah 1: Dapatkan endpoint
Logstash terhubung ke ApsaraMQ for Kafka melalui endpoint VPC. ApsaraMQ for Kafka menyediakan dua jenis endpoint VPC:
| Jenis endpoint | Port | Kapan digunakan |
|---|---|---|
| Default endpoint | 9092 | Akses standar tanpa autentikasi |
| Simple Authentication and Security Layer (SASL) endpoint | 9094 | Akses terotentikasi. Memerlukan fitur access control list (ACL) diaktifkan. Untuk informasi lebih lanjut, lihat Enable the ACL feature. |
Untuk informasi lebih lanjut tentang perbedaan endpoint, lihat Comparison among endpoints.
Untuk mendapatkan endpoint:
Login ke ApsaraMQ for Kafka console.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda dideploy.
Di halaman Instances, klik nama instans target.
Di halaman Instance Details, temukan endpoint pada bagian Endpoint Information. Jika Anda berencana menggunakan endpoint SASL, catat nilai Username dan Password pada bagian Configuration Information.

Langkah 2: Buat topik
Buat topik untuk menyimpan pesan yang akan dikonsumsi oleh Logstash.
Buat topik di wilayah yang sama dengan instans Elastic Compute Service (ECS) Anda. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen berjalan pada instans ECS di wilayah China (Beijing), topik juga harus berada di wilayah China (Beijing).
Login ke ApsaraMQ for Kafka console.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda dideploy.
Di halaman Instances, klik nama instans target.
Di panel navigasi sebelah kiri, klik Topics.
Di halaman Topics, klik Create Topic.
Di panel Create Topic, konfigurasikan parameter berikut lalu klik OK.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Name | Nama topik. | demo |
| Description | Deskripsi topik. | demo test |
| Partitions | Jumlah partisi. | 12 |
| Storage Engine | Jenis mesin penyimpanan. Hanya tersedia untuk instans Edisi Profesional. Instans Edisi Standar secara default menggunakan Cloud Storage. Opsi: Cloud Storage -- Menggunakan disk Alibaba Cloud dengan tiga replika dalam mode terdistribusi. Memberikan latensi rendah, performa tinggi, dan keandalan tinggi. Wajib digunakan ketika edisi instans adalah Standard (High Write). Local Storage -- Menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dengan tiga replika dalam mode terdistribusi. | Cloud Storage |
| Message Type | Jenis pengurutan pesan. Normal Message -- Pesan dengan kunci yang sama disimpan di partisi yang sama sesuai urutan pengiriman. Pengurutan partisi mungkin tidak dipertahankan selama kegagalan broker. Dipilih secara otomatis ketika Storage Engine adalah Cloud Storage. Partitionally Ordered Message -- Pesan dengan kunci yang sama tetap terurut di partisi yang sama bahkan selama kegagalan broker. Beberapa partisi mungkin menjadi tidak tersedia sementara. Dipilih secara otomatis ketika Storage Engine adalah Local Storage. | Normal Message |
| Log Cleanup Policy | Kebijakan pembersihan log. Hanya tersedia ketika Storage Engine adalah Local Storage (hanya untuk Edisi Profesional). Delete -- Kebijakan default. Menyimpan pesan berdasarkan periode retensi maksimum. Menghapus pesan paling awal ketika penggunaan penyimpanan melebihi 85%. Compact -- Hanya menyimpan nilai terbaru untuk setiap kunci pesan. Digunakan oleh komponen seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos. Penting Anda hanya dapat menggunakan topik dengan log-compacted pada komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. | Compact |
| Tag | Tag yang dilampirkan ke topik. | demo |
Setelah topik dibuat, topik tersebut akan muncul di halaman Topics.
Langkah 3: Kirim pesan uji
Kirim pesan uji untuk memverifikasi bahwa topik siap dikonsumsi.
Login ke ApsaraMQ for Kafka console.
Navigasi ke instans target lalu klik Topics di panel navigasi sebelah kiri.
Klik nama topik, lalu klik Send Message di pojok kanan atas halaman Topic Details.
Di panel Start to Send and Consume Message, pilih Sending Method (Console, Docker, atau SDK) lalu kirim pesan uji. Jika Anda ingin mengirim pesan ke partisi tertentu, Anda dapat menentukan ID partisi tersebut. Untuk informasi cara menanyakan ID partisi, lihat View partition status.
Langkah 4: Buat kelompok konsumen
Buat kelompok konsumen agar Logstash dapat melacak offset konsumsinya.
Login ke ApsaraMQ for Kafka console.
Navigasi ke instans target lalu klik Groups di panel navigasi sebelah kiri.
Di halaman Groups, klik Create Group.
Di panel Create Group, masukkan Group ID dan Description, lampirkan tag jika diperlukan, lalu klik OK.
Setelah kelompok konsumen dibuat, kelompok tersebut akan muncul di halaman Groups.
Langkah 5: Konfigurasi dan jalankan Logstash
Navigasi ke direktori instalasi Logstash.
Buat file konfigurasi bernama
input.confdi direktoribin:
input {
kafka {
bootstrap_servers => "<your-kafka-endpoint>" # Endpoint VPC, contoh: alikafka-pre-cn-zv****-1-vpc.alikafka.aliyuncs.com:9092
group_id => "<your-consumer-group-id>" # Kelompok konsumen yang dibuat di Langkah 4
topics => ["<your-topic-name>"] # Topik yang dibuat di Langkah 2
consumer_threads => 12 # Sesuaikan dengan jumlah partisi topik
auto_offset_reset => "earliest" # Mulai dari pesan paling awal
}
}
output {
stdout { codec => rubydebug }
}Jalankan Logstash:
./logstash -f input.confParameter konfigurasi
| Parameter | Deskripsi | Contoh |
|---|---|---|
bootstrap_servers | Daftar endpoint VPC untuk instans ApsaraMQ for Kafka yang dipisahkan koma. Bisa berupa default endpoint atau SASL endpoint. | alikafka-pre-cn-zv\*\*\*\*-1-vpc.alikafka.aliyuncs.com:9092 |
group_id | Identifier kelompok konsumen. | logstash_group |
topics | Nama satu atau beberapa topik yang akan dikonsumsi. | logstash_test |
consumer_threads | Jumlah thread konsumen. Kami merekomendasikan menyetel nilai ini sesuai dengan jumlah partisi topik. | 12 |
auto_offset_reset | Strategi reset offset. earliest: konsumsi dari pesan pertama yang tersedia. latest: hanya konsumsi pesan baru. | earliest |
Verifikasi konsumsi pesan
Setelah Logstash dimulai, pesan yang dikonsumsi akan dicetak ke stdout dalam format debug Ruby:

Lihat juga
Kafka input plugin — Referensi lengkap parameter untuk plugin input Kafka Logstash.
Comparison among endpoints — Perbedaan antara jenis endpoint ApsaraMQ for Kafka.