All Products
Search
Document Center

ApsaraMQ for Kafka:Gunakan ApsaraMQ for Kafka sebagai input Logstash dalam VPC

Last Updated:Mar 12, 2026

Logstash memproses data melalui pipeline yang terdiri atas input, filter, dan output. Dengan menghubungkan instans ApsaraMQ for Kafka sebagai input, Logstash mengonsumsi pesan dari topik Kafka dan meneruskannya ke tujuan seperti Elasticsearch, penyimpanan file, atau stdout.

Arsitektur ini memberikan dua keuntungan:

  • Pemrosesan asinkron: Kafka menyimpan sementara pesan sehingga Logstash dapat memprosesnya sesuai kecepatannya sendiri, mencegah kehilangan data selama lonjakan lalu lintas.

  • Decoupling: Jika sistem downstream seperti Elasticsearch mati, Kafka tetap menyimpan pesan hingga sistem tersebut pulih. Produsen upstream tidak terpengaruh.

Topik ini menjelaskan cara mengonfigurasi Logstash untuk mengonsumsi pesan dari instans ApsaraMQ for Kafka melalui koneksi virtual private cloud (VPC).

Prasyarat

Sebelum memulai, pastikan Anda telah:

  • Membeli dan men-deploy instans ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Purchase and deploy a VPC-connected instance.

  • Logstash telah terinstal. Untuk informasi selengkapnya, lihat Unduh Logstash.

  • Menginstal Java Development Kit (JDK) 8. Untuk informasi lebih lanjut, lihat halaman unduhan Java 8.

  • Memastikan plugin input Kafka untuk Logstash tersedia. Jalankan perintah berikut untuk memverifikasi. Jika plugin tidak terdaftar, instal plugin tersebut:

      bin/logstash-plugin list | grep logstash-input-kafka
      bin/logstash-plugin install logstash-input-kafka

Langkah 1: Dapatkan endpoint

Logstash terhubung ke ApsaraMQ for Kafka melalui endpoint VPC. ApsaraMQ for Kafka menyediakan dua jenis endpoint VPC:

Jenis endpointPortKapan digunakan
Default endpoint9092Akses standar tanpa autentikasi
Simple Authentication and Security Layer (SASL) endpoint9094Akses terotentikasi. Memerlukan fitur access control list (ACL) diaktifkan. Untuk informasi lebih lanjut, lihat Enable the ACL feature.

Untuk informasi lebih lanjut tentang perbedaan endpoint, lihat Comparison among endpoints.

Untuk mendapatkan endpoint:

  1. Login ke ApsaraMQ for Kafka console.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda dideploy.

  3. Di halaman Instances, klik nama instans target.

  4. Di halaman Instance Details, temukan endpoint pada bagian Endpoint Information. Jika Anda berencana menggunakan endpoint SASL, catat nilai Username dan Password pada bagian Configuration Information.

endpoint

Langkah 2: Buat topik

Buat topik untuk menyimpan pesan yang akan dikonsumsi oleh Logstash.

Penting

Buat topik di wilayah yang sama dengan instans Elastic Compute Service (ECS) Anda. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen berjalan pada instans ECS di wilayah China (Beijing), topik juga harus berada di wilayah China (Beijing).

  1. Login ke ApsaraMQ for Kafka console.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda dideploy.

  3. Di halaman Instances, klik nama instans target.

  4. Di panel navigasi sebelah kiri, klik Topics.

  5. Di halaman Topics, klik Create Topic.

  6. Di panel Create Topic, konfigurasikan parameter berikut lalu klik OK.

ParameterDeskripsiContoh
NameNama topik.demo
DescriptionDeskripsi topik.demo test
PartitionsJumlah partisi.12
Storage EngineJenis mesin penyimpanan. Hanya tersedia untuk instans Edisi Profesional. Instans Edisi Standar secara default menggunakan Cloud Storage. Opsi: Cloud Storage -- Menggunakan disk Alibaba Cloud dengan tiga replika dalam mode terdistribusi. Memberikan latensi rendah, performa tinggi, dan keandalan tinggi. Wajib digunakan ketika edisi instans adalah Standard (High Write). Local Storage -- Menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dengan tiga replika dalam mode terdistribusi.Cloud Storage
Message TypeJenis pengurutan pesan. Normal Message -- Pesan dengan kunci yang sama disimpan di partisi yang sama sesuai urutan pengiriman. Pengurutan partisi mungkin tidak dipertahankan selama kegagalan broker. Dipilih secara otomatis ketika Storage Engine adalah Cloud Storage. Partitionally Ordered Message -- Pesan dengan kunci yang sama tetap terurut di partisi yang sama bahkan selama kegagalan broker. Beberapa partisi mungkin menjadi tidak tersedia sementara. Dipilih secara otomatis ketika Storage Engine adalah Local Storage.Normal Message
Log Cleanup PolicyKebijakan pembersihan log. Hanya tersedia ketika Storage Engine adalah Local Storage (hanya untuk Edisi Profesional). Delete -- Kebijakan default. Menyimpan pesan berdasarkan periode retensi maksimum. Menghapus pesan paling awal ketika penggunaan penyimpanan melebihi 85%. Compact -- Hanya menyimpan nilai terbaru untuk setiap kunci pesan. Digunakan oleh komponen seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos.
Penting

Anda hanya dapat menggunakan topik dengan log-compacted pada komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry.

Compact
TagTag yang dilampirkan ke topik.demo

Setelah topik dibuat, topik tersebut akan muncul di halaman Topics.

Langkah 3: Kirim pesan uji

Kirim pesan uji untuk memverifikasi bahwa topik siap dikonsumsi.

  1. Login ke ApsaraMQ for Kafka console.

  2. Navigasi ke instans target lalu klik Topics di panel navigasi sebelah kiri.

  3. Klik nama topik, lalu klik Send Message di pojok kanan atas halaman Topic Details.

  4. Di panel Start to Send and Consume Message, pilih Sending Method (Console, Docker, atau SDK) lalu kirim pesan uji. Jika Anda ingin mengirim pesan ke partisi tertentu, Anda dapat menentukan ID partisi tersebut. Untuk informasi cara menanyakan ID partisi, lihat View partition status.

Langkah 4: Buat kelompok konsumen

Buat kelompok konsumen agar Logstash dapat melacak offset konsumsinya.

  1. Login ke ApsaraMQ for Kafka console.

  2. Navigasi ke instans target lalu klik Groups di panel navigasi sebelah kiri.

  3. Di halaman Groups, klik Create Group.

  4. Di panel Create Group, masukkan Group ID dan Description, lampirkan tag jika diperlukan, lalu klik OK.

Setelah kelompok konsumen dibuat, kelompok tersebut akan muncul di halaman Groups.

Langkah 5: Konfigurasi dan jalankan Logstash

  1. Navigasi ke direktori instalasi Logstash.

  2. Buat file konfigurasi bernama input.conf di direktori bin:

input {
  kafka {
    bootstrap_servers => "<your-kafka-endpoint>"       # Endpoint VPC, contoh: alikafka-pre-cn-zv****-1-vpc.alikafka.aliyuncs.com:9092
    group_id          => "<your-consumer-group-id>"     # Kelompok konsumen yang dibuat di Langkah 4
    topics            => ["<your-topic-name>"]          # Topik yang dibuat di Langkah 2
    consumer_threads  => 12                             # Sesuaikan dengan jumlah partisi topik
    auto_offset_reset => "earliest"                     # Mulai dari pesan paling awal
  }
}
output {
  stdout { codec => rubydebug }
}
  1. Jalankan Logstash:

./logstash -f input.conf

Parameter konfigurasi

ParameterDeskripsiContoh
bootstrap_serversDaftar endpoint VPC untuk instans ApsaraMQ for Kafka yang dipisahkan koma. Bisa berupa default endpoint atau SASL endpoint.alikafka-pre-cn-zv\*\*\*\*-1-vpc.alikafka.aliyuncs.com:9092
group_idIdentifier kelompok konsumen.logstash_group
topicsNama satu atau beberapa topik yang akan dikonsumsi.logstash_test
consumer_threadsJumlah thread konsumen. Kami merekomendasikan menyetel nilai ini sesuai dengan jumlah partisi topik.12
auto_offset_resetStrategi reset offset. earliest: konsumsi dari pesan pertama yang tersedia. latest: hanya konsumsi pesan baru.earliest

Verifikasi konsumsi pesan

Setelah Logstash dimulai, pesan yang dikonsumsi akan dicetak ke stdout dalam format debug Ruby:

logstash_5

Lihat juga