All Products
Search
Document Center

ApsaraMQ for Kafka:Kirim pesan dari Logstash ke ApsaraMQ for Kafka melalui Internet

Last Updated:Mar 12, 2026

Logstash dapat meneruskan data log dan event ke instans ApsaraMQ for Kafka melalui Internet menggunakan otentikasi SASL_SSL. Panduan ini memandu Anda melalui pengambilan titik akhir, pembuatan topik, konfigurasi Logstash, dan verifikasi pesan.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Langkah 1: Dapatkan titik akhir dan kredensial

Logstash terhubung ke ApsaraMQ for Kafka melalui titik akhir SSL. Ambil titik akhir dan kredensial SASL dari Konsol.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.

  3. Pada halaman Instances, klik nama instans target.

  4. Pada halaman Instance Details, kumpulkan informasi berikut: endpoint

    • Bagian Endpoint Information: Salin titik akhir SSL (port 9093).

    • Bagian Configuration Information: Catat Username dan Password.

Catatan

Untuk detail mengenai jenis titik akhir, lihat Comparison among endpoints.

Langkah 2: Buat topik

Buat topik untuk menerima pesan yang dikirim oleh Logstash.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.

    Penting

    Buat topik di wilayah yang sama dengan instans Elastic Compute Service (ECS) Anda. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen Anda berjalan pada instans ECS di wilayah China (Beijing), buat topik tersebut juga di China (Beijing).

  3. Pada halaman Instances, klik nama instans target.

  4. Pada panel navigasi sebelah kiri, klik Topics.

  5. Pada halaman Topics, klik Create Topic.

  6. Pada panel Create Topic, konfigurasikan properti topik lalu klik OK. Setelah topik dibuat, topik tersebut akan muncul di halaman Topics.

    ParameterDescriptionExample
    NameNama topik.demo
    DescriptionDeskripsi singkat mengenai topik.demo test
    PartitionsJumlah partisi.12
    Storage EngineJenis mesin penyimpanan. Hanya dapat dikonfigurasi pada instans Edisi Profesional. Edisi Standar secara default menggunakan Cloud Storage.
    - Cloud Storage: Menggunakan disk Alibaba Cloud dengan 3 replika terdistribusi. Latensi rendah, performa tinggi, daya tahan panjang, dan keandalan tinggi. Wajib digunakan untuk instans Standard (High Write).
    - Local Storage: Menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dengan 3 replika terdistribusi.

    Cloud Storage
    Message TypePerilaku pengurutan pesan.
    - Normal Message: Pesan dengan kunci yang sama masuk ke partisi yang sama sesuai urutan pengiriman. Urutan mungkin tidak dipertahankan saat terjadi kegagalan broker. Dipilih otomatis ketika Storage Engine adalah Cloud Storage.
    - Partitionally Ordered Message: Pesan dengan kunci yang sama masuk ke partisi yang sama sesuai urutan pengiriman. Urutan dipertahankan selama kegagalan broker, tetapi partisi yang terpengaruh tidak tersedia hingga dipulihkan. Dipilih otomatis ketika Storage Engine adalah Local Storage.

    Normal Message
    Log Cleanup PolicyHanya dapat dikonfigurasi ketika Storage Engine adalah Local Storage (Edisi Profesional).
    - Delete: Kebijakan default. Menyimpan pesan hingga periode retensi maksimum. Menghapus pesan tertua ketika penyimpanan melebihi 85%.
    - Compact: Menyimpan hanya nilai terbaru untuk setiap kunci. Anda hanya dapat menggunakan topik dengan kompaksi log pada komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos.

    Compact
    TagTag opsional yang dilampirkan ke topik.demo

Langkah 3: Konfigurasikan dan jalankan Logstash

Siapkan sertifikat SSL, kredensial SASL, dan konfigurasi output Logstash di server Anda, lalu kirim pesan uji coba.

Unduh sertifikat SSL

Beralih ke direktori bin Logstash dan unduh sertifikat truststore:

cd <logstash-install-dir>/bin
wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/raw/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks

Buat file konfigurasi JAAS

Buat file bernama jaas.conf di direktori bin Logstash dengan konten berikut:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<your-username>"
  password="<your-password>";
};

Ganti placeholder dengan nilai aktual Anda:

PlaceholderDescriptionExample
<your-username>Username dari bagian Configuration Information.alikafka_pre-cn-v0h1\*\*\*
<your-password>Password dari bagian Configuration Information.GQiSmqbQVe3b9hdKLDcIlkrBK6\*\*\*
Catatan

Pengaturan jaas_path berlaku untuk seluruh JVM. Jika Anda menjalankan beberapa output Kafka dalam satu instans Logstash dan memerlukan kredensial berbeda untuk masing-masing, gunakan parameter inline sasl_jaas_config sebagai gantinya. Lihat referensi Kafka output plugin untuk detail selengkapnya.

Buat file konfigurasi output Logstash

Buat file bernama output.conf di direktori bin Logstash dengan konten berikut:

input {
    stdin {}
}

output {
    stdout { codec => json }
    kafka {
        bootstrap_servers => "<your-endpoint>"
        topic_id => "<your-topic>"
        security_protocol => "SASL_SSL"
        sasl_mechanism => "PLAIN"
        jaas_path => "<logstash-install-dir>/bin/jaas.conf"
        ssl_truststore_password => "KafkaOnsClient"
        ssl_truststore_location => "<logstash-install-dir>/bin/kafka.client.truststore.jks"
        ssl_endpoint_identification_algorithm => ""
    }
}

Ganti placeholder berikut dengan nilai aktual Anda:

PlaceholderDescriptionExample
<your-endpoint>Titik akhir SSL (port 9093) dari bagian Endpoint Information.alikafka-pre-cn-zv\*\*\*\*\*-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-3.alikafka.aliyuncs.com:9093
<your-topic>Nama topik yang dibuat pada Langkah 2.logstash_test
<logstash-install-dir>Jalur mutlak ke direktori instalasi Logstash Anda./home/logstash-7.6.2

Parameter sisanya menggunakan nilai tetap. Jangan ubah nilai-nilai tersebut:

ParameterNilai tetapDescription
security_protocolSASL_SSLProtokol keamanan untuk koneksi Internet.
sasl_mechanismPLAINMekanisme otentikasi SASL.
ssl_truststore_passwordKafkaOnsClientKata sandi untuk sertifikat truststore.
ssl_endpoint_identification_algorithm"" (string kosong)Diperlukan untuk Logstash 6.x dan versi lebih baru. Menonaktifkan verifikasi hostname.

Kirim pesan uji coba

  1. Jalankan Logstash dengan konfigurasi output:

       ./logstash -f output.conf
  2. Setelah Logstash mulai berjalan, ketik test lalu tekan Enter. Output stdout menampilkan pesan secara lokal dalam format JSON, sedangkan output kafka mengirimkannya ke instans ApsaraMQ for Kafka Anda.

    output_result

Verifikasi hasil

Konfirmasi bahwa pesan telah sampai ke instans ApsaraMQ for Kafka Anda dengan memeriksa status partisi dan mengkueri pesan di Konsol.

Periksa status partisi

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.

  3. Pada halaman Instances, klik nama instans target.

  4. Pada panel navigasi sebelah kiri, klik Topics.

  5. Klik nama topik, lalu klik tab Partition Status pada halaman Topic Details.

    ParameterDescription
    Partition IDID partisi.
    Minimum OffsetOffset paling awal dalam partisi.
    Maximum OffsetOffset terbaru dalam partisi.
    MessagesJumlah total pesan dalam partisi.
    Last Updated AtWaktu saat pesan terbaru disimpan.

    partition status

Kueri pesan berdasarkan offset

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.

  3. Pada halaman Instances, klik nama instans target.

  4. Pada panel navigasi sebelah kiri, klik Message Query.

  5. Dari daftar drop-down Search Method, pilih Search by offset.

  6. Pilih topik dari daftar drop-down Topic, pilih partisi dari daftar drop-down Partition, masukkan nilai offset pada bidang Offset, lalu klik Search. Konsol mengembalikan semua pesan dengan offset lebih besar dari atau sama dengan nilai yang ditentukan. Sebagai contoh, mengatur Partition ke 5 dan Offset ke 5 akan mengembalikan semua pesan dari partisi 5 dengan offset 5 dan seterusnya.

    ParameterDescription
    PartitionPartisi tempat pesan disimpan.
    OffsetOffset pesan dalam partisi.
    KeyKunci pesan, ditampilkan sebagai string.
    ValueIsi pesan, ditampilkan sebagai string.
    Created AtTimestamp saat pesan dikirim. Jika Anda menentukan nilai untuk bidang timestamp ProducerRecord, nilai yang ditentukan akan ditampilkan. Jika Anda tidak menentukan nilai, waktu sistem lokal saat pesan dikirim akan ditampilkan. Jika bidang timestamp diatur ke 0 atau nilai tidak valid, waktu akan ditampilkan dalam format 1970/x/x x:x:x. Klien pada ApsaraMQ for Kafka versi 0.9 atau lebih lama tidak dapat mengatur bidang ini.
    ActionsDownload Key: Unduh kunci pesan. Download Value: Unduh isi pesan. Konsol menampilkan hingga 1 KB per pesan. Unduh pesan untuk melihat konten yang melebihi 1 KB. Maksimal 10 MB pesan dapat diunduh sekaligus.

Referensi