All Products
Search
Document Center

ApsaraMQ for Kafka:Menghubungkan ke Logstash sebagai input melalui Internet

Last Updated:Nov 11, 2025

Anda dapat menghubungkan instans ApsaraMQ for Kafka ke Logstash sebagai input. Topik ini menjelaskan cara menggunakan Logstash untuk mengonsumsi pesan dari instans ApsaraMQ for Kafka melalui Internet.

Prasyarat

Sebelum memulai, selesaikan tugas-tugas berikut:

Langkah 1: Mendapatkan informasi akses

Logstash terhubung ke ApsaraMQ for Kafka menggunakan titik akhir. Nama pengguna dan kata sandi instans ApsaraMQ for Kafka diperlukan untuk otentikasi keamanan.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instans ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Pada halaman Instances, klik nama instans yang ingin Anda hubungkan ke Logstash sebagai input.

  4. Di bagian Endpoint Information pada halaman Instance Details, lihat titik akhir instans tersebut. Di bagian Configuration Information, peroleh nilai parameter Username dan Password.

    endpoint

    Catatan

    Untuk informasi tentang perbedaan antara berbagai jenis titik akhir, lihat Perbandingan antar titik akhir.

Langkah 2: Membuat topik

Buat topik untuk menyimpan pesan.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instans ApsaraMQ for Kafka yang ingin Anda kelola berada.

    Penting

    Anda harus membuat topik di wilayah tempat instans Elastic Compute Service (ECS) Anda dideploy. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen pesan berjalan pada instans ECS yang dideploy di wilayah China (Beijing), topik juga harus dibuat di wilayah China (Beijing).

  3. Pada halaman Instances, klik nama instans yang ingin Anda kelola.

  4. Di panel navigasi sebelah kiri, klik Topics.

  5. Pada halaman Topics, klik Create Topic.

  6. Di panel Create Topic, tentukan properti topik dan klik OK.

    Parameter

    Deskripsi

    Contoh

    Name

    Nama topik.

    demo

    Description

    Deskripsi topik.

    demo test

    Partitions

    Jumlah partisi dalam topik.

    12

    Storage Engine

    Catatan

    Anda hanya dapat menentukan jenis mesin penyimpanan jika menggunakan instans Edisi Profesional non-serverless. Untuk jenis instans lainnya, Cloud Storage dipilih secara default.

    Jenis mesin penyimpanan yang digunakan untuk menyimpan pesan dalam topik.

    ApsaraMQ for Kafka mendukung jenis mesin penyimpanan berikut:

    • Cloud Storage: Jika Anda memilih nilai ini, sistem menggunakan disk Alibaba Cloud untuk topik dan menyimpan data dalam tiga replika secara terdistribusi. Mesin penyimpanan ini memiliki latensi rendah, performa tinggi, daya tahan lama, dan keandalan tinggi. Jika Anda menyetel parameter Instance Edition ke Standard (High Write) saat membuat instans, Anda hanya dapat menyetel parameter ini ke Cloud Storage.

    • Local Storage: Jika Anda memilih nilai ini, sistem menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dan menyimpan data dalam tiga replika secara terdistribusi.

    Cloud Storage

    Message Type

    Jenis pesan topik. Nilai yang valid:

    • Normal Message: Secara default, pesan yang memiliki kunci yang sama disimpan dalam partisi yang sama sesuai urutan pengiriman pesan. Jika broker dalam kluster gagal, urutan pesan yang disimpan dalam partisi mungkin tidak dipertahankan. Jika Anda menyetel parameter Storage Engine ke Cloud Storage, parameter ini akan secara otomatis disetel ke Normal Message.

    • Partitionally Ordered Message: Secara default, pesan yang memiliki kunci yang sama disimpan dalam partisi yang sama sesuai urutan pengiriman pesan. Jika broker dalam kluster gagal, pesan tetap disimpan dalam partisi sesuai urutan pengiriman pesan. Pesan dalam beberapa partisi tidak dapat dikirim hingga partisi tersebut dipulihkan. Jika Anda menyetel parameter Storage Engine ke Local Storage, parameter ini akan secara otomatis disetel ke Partitionally Ordered Message.

    Normal Message

    Log Cleanup Policy

    Kebijakan pembersihan log yang digunakan oleh topik.

    Jika Anda menyetel parameter Storage Engine ke Local Storage, Anda harus mengonfigurasi parameter Log Cleanup Policy. Anda hanya dapat menyetel parameter Storage Engine ke Local Storage jika menggunakan instans ApsaraMQ for Kafka Edisi Profesional.

    ApsaraMQ for Kafka menyediakan kebijakan pembersihan log berikut:

    • Delete: kebijakan pembersihan log default. Jika ruang penyimpanan yang tersedia mencukupi, pesan dipertahankan berdasarkan periode retensi maksimum. Setelah penggunaan penyimpanan melebihi 85%, sistem menghapus pesan yang paling awal disimpan untuk memastikan ketersediaan layanan.

    • Compact: kebijakan log compaction yang digunakan dalam Apache Kafka. Log compaction memastikan bahwa nilai terbaru dipertahankan untuk pesan yang memiliki kunci yang sama. Kebijakan ini cocok untuk skenario seperti memulihkan sistem yang gagal atau memuat ulang cache setelah sistem dimulai ulang. Misalnya, ketika Anda menggunakan Kafka Connect atau Confluent Schema Registry, Anda harus menyimpan informasi tentang status dan konfigurasi sistem dalam topik dengan log-compacted.

      Penting

      Anda hanya dapat menggunakan topik dengan log-compacted dalam komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi selengkapnya, lihat aliware-kafka-demos.

    Compact

    Tag

    Tag yang ingin Anda sambungkan ke topik.

    demo

    Setelah topik dibuat, Anda dapat melihat topik tersebut di halaman Topics.

Langkah 3: Mengirim pesan

Kirim pesan ke topik yang telah dibuat.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instans ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Pada halaman Instances, klik nama instans yang ingin Anda kelola.

  4. Di panel navigasi sebelah kiri, klik Topics.

  5. Pada halaman Topics, klik nama topik yang ingin Anda kelola. Di pojok kanan atas halaman Topic Details, klik Send Message.Name

  6. Di panel Start to Send and Consume Message, konfigurasikan parameter untuk mengirim pesan uji coba.

    • Jika Anda menyetel parameter Sending Method ke Console, lakukan langkah-langkah berikut:

      1. Di bidang Message Key, masukkan kunci pesan. Contoh: demo.

      2. Di bidang Message Content, masukkan konten pesan. Contoh: {"key": "test"}.

      3. Konfigurasikan parameter Send to Specified Partition untuk menentukan apakah akan mengirim pesan uji coba ke partisi tertentu.

        • Jika Anda ingin mengirim pesan uji coba ke partisi tertentu, klik Yes dan masukkan ID partisi di bidang Partition ID. Contoh: 0. Untuk informasi tentang cara menanyakan ID partisi, lihat View partition status.

        • Jika Anda tidak ingin mengirim pesan uji coba ke partisi tertentu, klik No.

      4. Gunakan SDK ApsaraMQ for Kafka atau jalankan perintah Docker yang ditampilkan di panel Start to Send and Consume Message untuk berlangganan pesan uji coba.

    • Jika Anda menyetel parameter Sending Method ke Docker, lakukan langkah-langkah berikut untuk menjalankan kontainer Docker:

      1. Jalankan perintah Docker yang ditampilkan di bagian Run the Docker container to produce a sample message untuk mengirim pesan uji coba.

      2. Jalankan perintah Docker yang ditampilkan di bagian How do I consume a message after the message is sent? untuk berlangganan pesan uji coba.

    • Jika Anda menyetel parameter Sending Method ke SDK, pilih SDK untuk bahasa pemrograman atau framework yang diperlukan serta metode akses untuk mengirim dan berlangganan pesan uji coba.

Langkah 4: Membuat grup

Buat Group untuk Logstash.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instans ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Pada halaman Instances, klik nama instans yang ingin Anda kelola.

  4. Di panel navigasi sebelah kiri, klik Groups.

  5. Pada halaman Groups, klik Create Group.

  6. Di panel Create Group, masukkan nama grup di bidang Group ID dan deskripsi grup di bidang Description, sambungkan tag ke grup, lalu klik OK.

    Setelah membuat kelompok konsumen, Anda dapat melihat kelompok konsumen tersebut di halaman Groups.

Langkah 5: Menggunakan Logstash untuk mengonsumsi pesan

Jalankan Logstash pada mesin tempat Logstash diinstal dan konsumsi pesan dari topik yang telah dibuat.

  1. Jalankan perintah cd untuk beralih ke direktori bin Logstash.

  2. Jalankan perintah berikut untuk mengunduh file sertifikat kafka.client.truststore.jks.

    wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks
  3. Buat file konfigurasi bernama jaas.conf.

    1. Jalankan perintah vim jaas.conf untuk membuat file konfigurasi kosong.

    2. Tekan tombol i untuk memasuki mode insert.

    3. Masukkan konten berikut.

      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="XXX"
        password="XXX";
      };

      Parameter

      Deskripsi

      Contoh

      username

      Nama pengguna instans yang terhubung ke Internet dan VPC.

      alikafka_pre-cn-v0h1***

      password

      Kata sandi instans yang terhubung ke Internet dan VPC.

      GQiSmqbQVe3b9hdKLDcIlkrBK6***

    4. Tekan tombol Esc untuk kembali ke mode CLI.

    5. Tekan tombol : untuk memasuki mode bottom line. Masukkan wq dan tekan tombol Enter untuk menyimpan file dan keluar.

  4. Buat file konfigurasi bernama input.conf.

    1. Jalankan perintah vim input.conf untuk membuat file konfigurasi kosong.

    2. Tekan tombol i untuk memasuki mode insert.

    3. Masukkan konten berikut.

      input {
          kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093"
              topics => ["logstash_test"]
      
              security_protocol => "SASL_SSL"
              sasl_mechanism => "PLAIN"
      
              jaas_path => "/home/logstash-7.6.2/bin/jaas.conf"
      
              ssl_truststore_password => "KafkaOnsClient"
              ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks"
      
              ssl_endpoint_identification_algorithm => ""
      
              group_id => "logstash_group"
              consumer_threads => 3
              auto_offset_reset => "earliest"
          }
      }
      
      output {
          stdout {
              codec => rubydebug
          }
      }

      Parameter

      Deskripsi

      Contoh

      bootstrap_servers

      Titik akhir Internet yang disediakan oleh ApsaraMQ for Kafka adalah titik akhir SSL.

      alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093

      topics

      Nama topik.

      logstash_test

      security_protocol

      Protokol keamanan. Nilai default adalah SASL_SSL. Anda tidak perlu mengubah nilai ini.

      SASL_SSL

      sasl_mechanism

      Mekanisme otentikasi keamanan. Nilai default adalah PLAIN. Anda tidak perlu mengubah nilai ini.

      PLAIN

      jaas_path

      Jalur file konfigurasi jaas.conf.

      /home/logstash-7.6.2/bin/jaas.conf

      ssl_truststore_password

      Kata sandi sertifikat kafka.client.truststore.jks. Nilai default adalah KafkaOnsClient. Anda tidak perlu mengubah nilai ini.

      KafkaOnsClient

      ssl_truststore_location

      Jalur sertifikat kafka.client.truststore.jks.

      /home/logstash-7.6.2/bin/kafka.client.truststore.jks

      ssl_endpoint_identification_algorithm

      Parameter ini diperlukan untuk Logstash 6.x dan versi yang lebih baru.

      Nilai kosong

      group_id

      Nama kelompok konsumen.

      logstash_group

      consumer_threads

      Jumlah thread konsumen. Kami menyarankan agar Anda menyetel parameter ini ke nilai yang sama dengan jumlah partisi dalam topik.

      3

      auto_offset_reset

      Menyetel ulang offset. Nilai yang valid:

      • earliest: membaca pesan paling awal.

      • latest: membaca pesan terbaru.

      earliest

    4. Tekan tombol Esc untuk kembali ke mode CLI.

    5. Tekan tombol : untuk memasuki mode bottom line. Masukkan wq dan tekan tombol Enter untuk menyimpan file dan keluar.

  5. Jalankan perintah berikut untuk mengonsumsi pesan.

    ./logstash -f input.conf

    Output berikut dikembalikan.

    result

Informasi lebih lanjut

Untuk informasi selengkapnya tentang pengaturan parameter, lihat Kafka input plugin.