All Products
Search
Document Center

ApsaraMQ for Kafka:Menghubungkan Logstash sebagai output di lingkungan VPC

Last Updated:Nov 11, 2025

ApsaraMQ for Kafka dapat dihubungkan ke Logstash sebagai output. Topik ini menjelaskan cara menggunakan Logstash untuk mengirim pesan ke ApsaraMQ for Kafka di lingkungan VPC.

Prasyarat

  • Anda telah membeli dan men-deploy instans ApsaraMQ for Kafka. Topik ini menggunakan contoh instans non-Serverless. Untuk informasi selengkapnya, lihat Menghubungkan ke instansi terhubung-VPC.

  • Anda telah mengunduh dan menginstal Logstash. Untuk informasi selengkapnya, lihat Unduh Logstash.

  • Anda telah mengunduh dan menginstal JDK 8. Untuk informasi selengkapnya, lihat Unduh JDK 8.

Langkah 1: Mendapatkan titik akhir

Logstash menggunakan titik akhir ApsaraMQ for Kafka untuk menghubungkan ke instans ApsaraMQ for Kafka.

Catatan

ApsaraMQ for Kafka mendukung titik akhir VPC berikut:

  • Titik akhir default: nomor port adalah 9092.

  • Titik akhir SASL: nomor port adalah 9094. Untuk menggunakan titik akhir SASL, Anda harus mengaktifkan fitur daftar kontrol akses (ACL). Untuk informasi selengkapnya, lihat Mengaktifkan fitur ACL.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instans ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Pada halaman Instances, klik nama instans yang ingin Anda hubungkan sebagai output ke Logstash.

  4. Di bagian Endpoint Information pada halaman Instance Details, lihat titik akhir instans tersebut. Di bagian Configuration Information, peroleh nilai parameter Username dan Password.

    endpoint

    Catatan

    Untuk informasi tentang perbedaan antara berbagai jenis titik akhir, lihat Perbandingan antar titik akhir.

Langkah 2: Membuat topik

Buat topik untuk menyimpan pesan.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instans ApsaraMQ for Kafka yang ingin Anda kelola berada.

    Penting

    Anda harus membuat topik di wilayah tempat instans Elastic Compute Service (ECS) Anda dideploy. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen pesan berjalan pada instans ECS yang dideploy di wilayah Tiongkok (Beijing), topik juga harus dibuat di wilayah Tiongkok (Beijing).

  3. Pada halaman Instances, klik nama instans yang ingin Anda kelola.

  4. Di panel navigasi sebelah kiri, klik Topics.

  5. Pada halaman Topics, klik Create Topic.

  6. Di panel Create Topic, tentukan properti topik dan klik OK.

    Parameter

    Deskripsi

    Contoh

    Name

    Nama topik.

    demo

    Description

    Deskripsi topik.

    demo test

    Partitions

    Jumlah partisi dalam topik.

    12

    Storage Engine

    Catatan

    Anda hanya dapat menentukan jenis mesin penyimpanan jika menggunakan instans Edisi Profesional non-serverless. Untuk jenis instans lainnya, Cloud Storage dipilih secara default.

    Jenis mesin penyimpanan yang digunakan untuk menyimpan pesan dalam topik.

    ApsaraMQ for Kafka mendukung jenis mesin penyimpanan berikut:

    • Cloud Storage: Jika Anda memilih nilai ini, sistem menggunakan disk Alibaba Cloud untuk topik dan menyimpan data dalam tiga replika secara terdistribusi. Mesin penyimpanan ini memiliki latensi rendah, performa tinggi, daya tahan lama, dan keandalan tinggi. Jika Anda menyetel parameter Instance Edition ke Standard (High Write) saat membuat instans, Anda hanya dapat menyetel parameter ini ke Cloud Storage.

    • Local Storage: Jika Anda memilih nilai ini, sistem menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dan menyimpan data dalam tiga replika secara terdistribusi.

    Cloud Storage

    Message Type

    Jenis pesan topik. Nilai yang valid:

    • Normal Message: Secara default, pesan yang memiliki kunci yang sama disimpan dalam partisi yang sama sesuai urutan pengiriman pesan. Jika broker dalam kluster gagal, urutan pesan yang disimpan dalam partisi mungkin tidak dipertahankan. Jika Anda menyetel parameter Storage Engine ke Cloud Storage, parameter ini secara otomatis disetel ke Normal Message.

    • Partitionally Ordered Message: Secara default, pesan yang memiliki kunci yang sama disimpan dalam partisi yang sama sesuai urutan pengiriman pesan. Jika broker dalam kluster gagal, pesan tetap disimpan dalam partisi sesuai urutan pengiriman pesan. Pesan dalam beberapa partisi tidak dapat dikirim hingga partisi tersebut dipulihkan. Jika Anda menyetel parameter Storage Engine ke Local Storage, parameter ini secara otomatis disetel ke Partitionally Ordered Message.

    Normal Message

    Log Cleanup Policy

    Kebijakan pembersihan log yang digunakan oleh topik.

    Jika Anda menyetel parameter Storage Engine ke Local Storage, Anda harus mengonfigurasi parameter Log Cleanup Policy. Anda hanya dapat menyetel parameter Mesin Penyimpanan ke Local Storage jika menggunakan instans ApsaraMQ for Kafka Edisi Profesional.

    ApsaraMQ for Kafka menyediakan kebijakan pembersihan log berikut:

    • Delete: kebijakan pembersihan log default. Jika ruang penyimpanan yang tersedia dalam sistem mencukupi, pesan akan disimpan berdasarkan periode retensi maksimum. Setelah penggunaan penyimpanan melebihi 85%, sistem akan menghapus pesan yang paling awal disimpan untuk memastikan ketersediaan layanan.

    • Compact: kebijakan log compaction yang digunakan dalam Apache Kafka. Log compaction memastikan bahwa nilai terbaru dipertahankan untuk pesan yang memiliki kunci yang sama. Kebijakan ini cocok untuk skenario seperti memulihkan sistem yang gagal atau memuat ulang cache setelah sistem dimulai ulang. Misalnya, ketika Anda menggunakan Kafka Connect atau Confluent Schema Registry, Anda harus menyimpan informasi tentang status dan konfigurasi sistem dalam topik dengan log-compacted.

      Penting

      Anda hanya dapat menggunakan topik dengan log-compacted dalam komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi selengkapnya, lihat aliware-kafka-demos.

    Compact

    Tag

    Tag yang ingin Anda sambungkan ke topik.

    demo

    Setelah topik dibuat, Anda dapat melihat topik tersebut di halaman Topics.

Langkah 3: Menggunakan Logstash untuk mengirim pesan

Pada mesin tempat Anda menginstal Logstash, jalankan Logstash dan kirim pesan ke topik yang telah Anda buat.

  1. Jalankan perintah cd untuk menuju ke direktori bin Logstash.

  2. Buat file konfigurasi output.conf.

    1. Jalankan perintah vim output.conf untuk membuat file konfigurasi kosong.

    2. Tekan tombol i untuk masuk ke mode insert.

    3. Masukkan konten berikut.

      input {
          input {
            stdin{}
        }
      }
      
      output {
         kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"
              topic_id => "logstash_test"
             }
      }

      Parameter

      Deskripsi

      Contoh

      bootstrap_servers

      ApsaraMQ for Kafka menyediakan titik akhir VPC berikut:

      • Titik akhir default

      • Titik akhir SASL

      alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092

      topic_id

      Nama topik.

      logstash_test

    4. Tekan tombol Esc untuk kembali ke mode CLI.

    5. Tekan tombol : untuk masuk ke mode bottom line. Masukkan wq dan tekan tombol Enter untuk menyimpan file dan keluar.

  3. Kirim pesan ke topik yang telah Anda buat.

    1. Jalankan ./logstash -f output.conf.

    2. Masukkan test dan tekan Enter.

      Hasil berikut dikembalikan.

      result

Langkah 4: Melihat partisi topik

Lihat status partisi untuk memeriksa pesan yang telah dikirim ke topik.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instans ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Pada halaman Instances, klik nama instans yang telah Anda hubungkan sebagai output ke Logstash.

  4. Di panel navigasi sebelah kiri, klik Topics.

  5. Pada halaman Topics, klik nama topik yang ingin Anda kelola. Pada halaman Topic Details, klik tab Partition Status.Name

    Tabel 1. Parameter yang termasuk dalam status partisi

    Parameter

    Deskripsi

    ID Partisi

    ID partisi.

    Offset Minimum

    Offset minimum dalam partisi.

    Offset Maksimum

    Offset maksimum dalam partisi.

    Pesan

    Jumlah pesan dalam partisi.

    Terakhir Diperbarui Pada

    Waktu saat pesan terakhir dalam partisi disimpan.

    分区状态信息

Langkah 5: Menanyakan pesan berdasarkan offset

Anda dapat menanyakan pesan yang dikirim berdasarkan ID partisi dan offset-nya.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instans ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Pada halaman Instances, klik nama instans yang ingin Anda kelola.

  4. Di panel navigasi sebelah kiri, klik Message Query.

  5. Pada halaman Message Query, pilih Search by offset dari daftar drop-down Search Method.

  6. Pilih topik dari daftar drop-down Topic dan partisi dari daftar drop-down Partition, masukkan nilai offset di kolom Offset, lalu klik Search.

    Pesan yang nilai offset-nya lebih besar dari atau sama dengan nilai offset yang ditentukan akan ditampilkan. Misalnya, jika Anda menentukan 5 sebagai nilai parameter Partisi dan Offset, sistem akan menanyakan pesan yang offset-nya sama dengan atau lebih besar dari 5 dari Partisi 5.

    Tabel 2. Parameter yang termasuk dalam hasil pencarian pesan

    Parameter

    Deskripsi

    Partition

    Partisi tempat pesan diambil.

    Offset

    Offset pesan.

    Key

    Kunci pesan. Kunci dikonversi menjadi string.

    Value

    Nilai pesan, yang juga dikenal sebagai konten pesan. Nilai pesan dikonversi menjadi string.

    Created At

    Titik waktu saat pesan dikirim. Nilainya adalah timestamp yang direkam klien saat pesan dikirim atau nilai kolom timestamp yang Anda tentukan untuk objek ProducerRecord.

    Catatan
    • Jika Anda menentukan nilai untuk kolom timestamp, nilai yang ditentukan akan ditampilkan.

    • Jika Anda tidak menentukan nilai untuk kolom timestamp, waktu sistem saat pesan dikirim akan ditampilkan.

    • Nilai dalam format 1970/x/x x:x:x menunjukkan bahwa kolom timestamp ditentukan sebagai 0 atau nilai yang tidak valid.

    • Anda tidak dapat menentukan kolom timestamp pada klien versi ApsaraMQ for Kafka 0.9 atau lebih awal.

    Actions

    • Klik Download Key untuk mengunduh kunci pesan.

    • Klik Download Value untuk mengunduh konten pesan.

    Penting
    • Hingga 1 KB konten untuk setiap pesan yang diambil dapat ditampilkan di Konsol ApsaraMQ for Kafka. Jika pesan yang diambil melebihi ukuran 1 KB, sistem secara otomatis memotong kontennya. Jika Anda ingin melihat konten pesan lengkap, unduh pesan tersebut.

    • Anda dapat mengunduh hingga 10 MB pesan yang diambil sekaligus. Jika pesan yang diambil melebihi ukuran 10 MB, hanya 10 MB pertama dari konten pesan yang dapat diunduh.

Informasi lebih lanjut

Untuk informasi selengkapnya tentang pengaturan parameter, lihat Plugin output Kafka.