Logstash dapat meneruskan data log dan event ke instans ApsaraMQ for Kafka melalui Internet menggunakan otentikasi SASL_SSL. Panduan ini memandu Anda melalui pengambilan titik akhir, pembuatan topik, konfigurasi Logstash, dan verifikasi pesan.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Instans ApsaraMQ for Kafka (tipe Internet dan VPC) yang telah dibeli dan dideploy. Lihat Beli dan deploy instans terhubung-Internet dan VPC
Logstash yang telah diinstal. Lihat Download Logstash
Java Development Kit (JDK) 8 yang telah diinstal. Lihat Download JDK 8
Langkah 1: Dapatkan titik akhir dan kredensial
Logstash terhubung ke ApsaraMQ for Kafka melalui titik akhir SSL. Ambil titik akhir dan kredensial SASL dari Konsol.
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.
Pada halaman Instances, klik nama instans target.
Pada halaman Instance Details, kumpulkan informasi berikut:

Bagian Endpoint Information: Salin titik akhir SSL (port 9093).
Bagian Configuration Information: Catat Username dan Password.
Untuk detail mengenai jenis titik akhir, lihat Comparison among endpoints.
Langkah 2: Buat topik
Buat topik untuk menerima pesan yang dikirim oleh Logstash.
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.
PentingBuat topik di wilayah yang sama dengan instans Elastic Compute Service (ECS) Anda. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen Anda berjalan pada instans ECS di wilayah China (Beijing), buat topik tersebut juga di China (Beijing).
Pada halaman Instances, klik nama instans target.
Pada panel navigasi sebelah kiri, klik Topics.
Pada halaman Topics, klik Create Topic.
Pada panel Create Topic, konfigurasikan properti topik lalu klik OK. Setelah topik dibuat, topik tersebut akan muncul di halaman Topics.
Parameter Description Example Name Nama topik. demo Description Deskripsi singkat mengenai topik. demo test Partitions Jumlah partisi. 12 Storage Engine Jenis mesin penyimpanan. Hanya dapat dikonfigurasi pada instans Edisi Profesional. Edisi Standar secara default menggunakan Cloud Storage.
- Cloud Storage: Menggunakan disk Alibaba Cloud dengan 3 replika terdistribusi. Latensi rendah, performa tinggi, daya tahan panjang, dan keandalan tinggi. Wajib digunakan untuk instans Standard (High Write).
- Local Storage: Menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dengan 3 replika terdistribusi.Cloud Storage Message Type Perilaku pengurutan pesan.
- Normal Message: Pesan dengan kunci yang sama masuk ke partisi yang sama sesuai urutan pengiriman. Urutan mungkin tidak dipertahankan saat terjadi kegagalan broker. Dipilih otomatis ketika Storage Engine adalah Cloud Storage.
- Partitionally Ordered Message: Pesan dengan kunci yang sama masuk ke partisi yang sama sesuai urutan pengiriman. Urutan dipertahankan selama kegagalan broker, tetapi partisi yang terpengaruh tidak tersedia hingga dipulihkan. Dipilih otomatis ketika Storage Engine adalah Local Storage.Normal Message Log Cleanup Policy Hanya dapat dikonfigurasi ketika Storage Engine adalah Local Storage (Edisi Profesional).
- Delete: Kebijakan default. Menyimpan pesan hingga periode retensi maksimum. Menghapus pesan tertua ketika penyimpanan melebihi 85%.
- Compact: Menyimpan hanya nilai terbaru untuk setiap kunci. Anda hanya dapat menggunakan topik dengan kompaksi log pada komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos.Compact Tag Tag opsional yang dilampirkan ke topik. demo
Langkah 3: Konfigurasikan dan jalankan Logstash
Siapkan sertifikat SSL, kredensial SASL, dan konfigurasi output Logstash di server Anda, lalu kirim pesan uji coba.
Unduh sertifikat SSL
Beralih ke direktori bin Logstash dan unduh sertifikat truststore:
cd <logstash-install-dir>/bin
wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/raw/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jksBuat file konfigurasi JAAS
Buat file bernama jaas.conf di direktori bin Logstash dengan konten berikut:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<your-username>"
password="<your-password>";
};Ganti placeholder dengan nilai aktual Anda:
| Placeholder | Description | Example |
|---|---|---|
<your-username> | Username dari bagian Configuration Information. | alikafka_pre-cn-v0h1\*\*\* |
<your-password> | Password dari bagian Configuration Information. | GQiSmqbQVe3b9hdKLDcIlkrBK6\*\*\* |
Pengaturan jaas_path berlaku untuk seluruh JVM. Jika Anda menjalankan beberapa output Kafka dalam satu instans Logstash dan memerlukan kredensial berbeda untuk masing-masing, gunakan parameter inline sasl_jaas_config sebagai gantinya. Lihat referensi Kafka output plugin untuk detail selengkapnya.
Buat file konfigurasi output Logstash
Buat file bernama output.conf di direktori bin Logstash dengan konten berikut:
input {
stdin {}
}
output {
stdout { codec => json }
kafka {
bootstrap_servers => "<your-endpoint>"
topic_id => "<your-topic>"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
jaas_path => "<logstash-install-dir>/bin/jaas.conf"
ssl_truststore_password => "KafkaOnsClient"
ssl_truststore_location => "<logstash-install-dir>/bin/kafka.client.truststore.jks"
ssl_endpoint_identification_algorithm => ""
}
}Ganti placeholder berikut dengan nilai aktual Anda:
| Placeholder | Description | Example |
|---|---|---|
<your-endpoint> | Titik akhir SSL (port 9093) dari bagian Endpoint Information. | alikafka-pre-cn-zv\*\*\*\*\*-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-3.alikafka.aliyuncs.com:9093 |
<your-topic> | Nama topik yang dibuat pada Langkah 2. | logstash_test |
<logstash-install-dir> | Jalur mutlak ke direktori instalasi Logstash Anda. | /home/logstash-7.6.2 |
Parameter sisanya menggunakan nilai tetap. Jangan ubah nilai-nilai tersebut:
| Parameter | Nilai tetap | Description |
|---|---|---|
security_protocol | SASL_SSL | Protokol keamanan untuk koneksi Internet. |
sasl_mechanism | PLAIN | Mekanisme otentikasi SASL. |
ssl_truststore_password | KafkaOnsClient | Kata sandi untuk sertifikat truststore. |
ssl_endpoint_identification_algorithm | "" (string kosong) | Diperlukan untuk Logstash 6.x dan versi lebih baru. Menonaktifkan verifikasi hostname. |
Kirim pesan uji coba
Jalankan Logstash dengan konfigurasi output:
./logstash -f output.confSetelah Logstash mulai berjalan, ketik
testlalu tekan Enter. Outputstdoutmenampilkan pesan secara lokal dalam format JSON, sedangkan outputkafkamengirimkannya ke instans ApsaraMQ for Kafka Anda.
Verifikasi hasil
Konfirmasi bahwa pesan telah sampai ke instans ApsaraMQ for Kafka Anda dengan memeriksa status partisi dan mengkueri pesan di Konsol.
Periksa status partisi
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.
Pada halaman Instances, klik nama instans target.
Pada panel navigasi sebelah kiri, klik Topics.
Klik nama topik, lalu klik tab Partition Status pada halaman Topic Details.
Parameter Description Partition ID ID partisi. Minimum Offset Offset paling awal dalam partisi. Maximum Offset Offset terbaru dalam partisi. Messages Jumlah total pesan dalam partisi. Last Updated At Waktu saat pesan terbaru disimpan. 
Kueri pesan berdasarkan offset
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.
Pada halaman Instances, klik nama instans target.
Pada panel navigasi sebelah kiri, klik Message Query.
Dari daftar drop-down Search Method, pilih Search by offset.
Pilih topik dari daftar drop-down Topic, pilih partisi dari daftar drop-down Partition, masukkan nilai offset pada bidang Offset, lalu klik Search. Konsol mengembalikan semua pesan dengan offset lebih besar dari atau sama dengan nilai yang ditentukan. Sebagai contoh, mengatur Partition ke
5dan Offset ke5akan mengembalikan semua pesan dari partisi 5 dengan offset 5 dan seterusnya.Parameter Description Partition Partisi tempat pesan disimpan. Offset Offset pesan dalam partisi. Key Kunci pesan, ditampilkan sebagai string. Value Isi pesan, ditampilkan sebagai string. Created At Timestamp saat pesan dikirim. Jika Anda menentukan nilai untuk bidang timestamp ProducerRecord, nilai yang ditentukan akan ditampilkan. Jika Anda tidak menentukan nilai, waktu sistem lokal saat pesan dikirim akan ditampilkan. Jika bidang timestamp diatur ke0atau nilai tidak valid, waktu akan ditampilkan dalam format1970/x/x x:x:x. Klien pada ApsaraMQ for Kafka versi 0.9 atau lebih lama tidak dapat mengatur bidang ini.Actions Download Key: Unduh kunci pesan. Download Value: Unduh isi pesan. Konsol menampilkan hingga 1 KB per pesan. Unduh pesan untuk melihat konten yang melebihi 1 KB. Maksimal 10 MB pesan dapat diunduh sekaligus.
Referensi
Kafka output plugin — Referensi lengkap parameter untuk plugin output Kafka Logstash.