Anda dapat menghubungkan ApsaraMQ for Kafka ke Filebeat sebagai output. Topik ini menjelaskan cara menggunakan Filebeat untuk mengirim pesan ke ApsaraMQ for Kafka melalui Internet.
Prasyarat
Sebelum memulai, lakukan langkah-langkah berikut:
Beli dan deploy instance ApsaraMQ for Kafka. Topik ini menggunakan contoh instance non-serverless. Untuk informasi selengkapnya, lihat Menghubungkan ke instance yang terhubung Internet dan VPC.
Unduh dan instal Filebeat. Untuk informasi selengkapnya, lihat Unduh Filebeat.
Unduh dan instal Java Development Kit (JDK) 8. Untuk informasi selengkapnya, lihat Unduh JDK 8.
Langkah 1: Mendapatkan titik akhir, nama pengguna, dan kata sandi
Filebeat terhubung ke ApsaraMQ for Kafka menggunakan titik akhir ApsaraMQ for Kafka.
Masuk ke Konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Pada halaman Instances, klik nama instance yang ingin Anda hubungkan ke Filebeat.
Di bagian Endpoint Information pada halaman Instance Details, lihat titik akhir instance tersebut. Di bagian Configuration Information, peroleh nilai parameter Username dan Password.
CatatanUntuk informasi mengenai perbedaan antara berbagai jenis titik akhir, lihat Perbandingan antar titik akhir.
Langkah 2: Membuat topik
Buat topik untuk menyimpan pesan.
Masuk ke Konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
PentingAnda harus membuat topik di wilayah tempat instance Elastic Compute Service (ECS) Anda dideploy. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen pesan berjalan pada instance ECS yang dideploy di wilayah Tiongkok (Beijing), topik juga harus dibuat di wilayah Tiongkok (Beijing).
Pada halaman Instances, klik nama instance yang ingin Anda kelola.
Di panel navigasi sebelah kiri, klik Topics.
Pada halaman Topics, klik Create Topic.
Di panel Create Topic, tentukan properti topik dan klik OK.
Parameter
Deskripsi
Contoh
Name
Nama topik.
demo
Description
Deskripsi topik.
demo test
Partitions
Jumlah partisi dalam topik.
12
Storage Engine
CatatanAnda hanya dapat menentukan jenis mesin penyimpanan jika menggunakan instance Edisi Profesional non-serverless. Untuk jenis instance lainnya, Cloud Storage dipilih secara default.
Jenis mesin penyimpanan yang digunakan untuk menyimpan pesan dalam topik.
ApsaraMQ for Kafka mendukung jenis mesin penyimpanan berikut:
Cloud Storage: Jika Anda memilih nilai ini, sistem menggunakan disk Alibaba Cloud untuk topik dan menyimpan data dalam tiga replika secara terdistribusi. Mesin penyimpanan ini memiliki latensi rendah, performa tinggi, daya tahan lama, dan keandalan tinggi. Jika Anda mengatur parameter Instance Edition ke Standard (High Write) saat membuat instance, Anda hanya dapat mengatur parameter ini ke Cloud Storage.
Local Storage: Jika Anda memilih nilai ini, sistem menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dan menyimpan data dalam tiga replika secara terdistribusi.
Cloud Storage
Message Type
Jenis pesan topik. Nilai yang valid:
Normal Message: Secara default, pesan yang memiliki kunci yang sama disimpan dalam partisi yang sama sesuai urutan pengiriman. Jika broker dalam kluster gagal, urutan pesan yang disimpan dalam partisi mungkin tidak terjaga. Jika Anda mengatur parameter Storage Engine ke Cloud Storage, parameter ini secara otomatis diatur ke Normal Message.
Partitionally Ordered Message: Secara default, pesan yang memiliki kunci yang sama disimpan dalam partisi yang sama sesuai urutan pengiriman. Jika broker dalam kluster gagal, pesan tetap disimpan dalam partisi sesuai urutan pengiriman. Pesan dalam beberapa partisi tidak dapat dikirim hingga partisi tersebut dipulihkan. Jika Anda mengatur parameter Storage Engine ke Local Storage, parameter ini secara otomatis diatur ke Partitionally Ordered Message.
Normal Message
Log Cleanup Policy
Kebijakan pembersihan log yang digunakan oleh topik.
Jika Anda mengatur parameter Storage Engine ke Local Storage, Anda harus mengonfigurasi parameter Log Cleanup Policy. Anda hanya dapat mengatur parameter Mesin Penyimpanan ke Local Storage jika menggunakan instance ApsaraMQ for Kafka Edisi Profesional.
ApsaraMQ for Kafka menyediakan kebijakan pembersihan log berikut:
Delete: kebijakan pembersihan log default. Jika ruang penyimpanan yang tersedia mencukupi, pesan disimpan berdasarkan periode retensi maksimum. Setelah penggunaan penyimpanan melebihi 85%, sistem akan menghapus pesan yang paling awal disimpan untuk memastikan ketersediaan layanan.
Compact: kebijakan kompaksi log yang digunakan dalam Apache Kafka. Kompaksi log memastikan bahwa nilai terbaru dipertahankan untuk pesan yang memiliki kunci yang sama. Kebijakan ini cocok untuk skenario seperti memulihkan sistem yang gagal atau memuat ulang cache setelah sistem dimulai ulang. Misalnya, ketika Anda menggunakan Kafka Connect atau Confluent Schema Registry, Anda harus menyimpan informasi tentang status dan konfigurasi sistem dalam topik dengan kompaksi log.
PentingAnda hanya dapat menggunakan topik dengan kompaksi log pada komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi selengkapnya, lihat aliware-kafka-demos.
Compact
Tag
Tag yang ingin Anda lampirkan ke topik.
demo
Setelah topik dibuat, Anda dapat melihat topik tersebut di halaman Topics.
Langkah 3: Menggunakan Filebeat untuk mengirim pesan
Jalankan Filebeat pada mesin tempat Filebeat diinstal untuk mengirim pesan ke topik yang telah Anda buat.
Jalankan perintah cd untuk beralih ke direktori instalasi Filebeat.
Jalankan perintah berikut untuk mengunduh file sertifikat CA.
wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20220826/ytsw/only-4096-ca-certBuat file konfigurasi output.conf.
Jalankan perintah
vim output.confuntuk membuat dan membuka file konfigurasi.Tekan tombol i untuk memasuki mode insert.
Masukkan konten berikut.
filebeat.inputs: - type: stdin output.kafka: hosts: ["alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093", "alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093", "alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093"] username: "alikafka_pre-cn-v641e1d***" password: "aeN3WLRoMPRXmAP2jvJuGk84Kuuo***" topic: 'filebeat_test' partition.round_robin: reachable_only: false ssl.certificate_authorities: ["/home/admin/filebeat/filebeat-7.7.0-linux-x86_64/tasks/vpc_ssl/ca-cert"] ssl.verification_mode: none required_acks: 1 compression: none max_message_bytes: 1000000Parameter
Deskripsi
Contoh
hosts
Titik akhir publik yang disediakan oleh ApsaraMQ for Kafka adalah titik akhir SSL.
alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093, alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093, alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
username
Nama pengguna instance yang terhubung Internet dan VPC.
alikafka_pre-cn-v641e1d***
password
Kata sandi instance yang terhubung Internet dan VPC.
aeN3WLRoMPRXmAP2jvJuGk84Kuuo***
topic
Nama topik.
filebeat_test
reachable_only
Menentukan apakah pesan hanya dikirim ke partisi yang tersedia. Nilai yang valid:
true: Jika partisi primer tidak tersedia, output mungkin diblokir.
false: Bahkan jika partisi primer tidak tersedia, output tidak diblokir.
false
ssl.certificate_authorities
Jalur tempat sertifikat CA disimpan.
/home/admin/filebeat/filebeat-7.7.0-linux-x86_64/ca-cert
ssl.verification_mode
Mode autentikasi.
none
required_acks
Tingkat keandalan acknowledgment (ACK). Nilai yang valid:
0: Tidak memerlukan tanggapan.
1: Sistem menunggu commit lokal.
-1: Sistem menunggu semua replika melakukan commit.
Nilai default: 1.
1
compression
Penyandi kompresi data. Nilai default: gzip. Nilai yang valid:
None.
snappy: paket pengembangan C++ untuk kompresi dan dekompresi.
lz4: algoritma kompresi data tanpa kehilangan informasi yang berfokus pada kecepatan kompresi dan dekompresi.
gzip: program kompresi file untuk perangkat lunak bebas GNU.
none
max_message_bytes
Ukuran pesan maksimum dalam byte. Nilai default: 1000000. Nilai ini harus lebih kecil dari ukuran pesan maksimum yang Anda konfigurasikan untuk instance ApsaraMQ for Kafka Anda.
1000000
Untuk informasi selengkapnya mengenai parameter, lihat Plugin output Kafka.
Tekan tombol Esc untuk kembali ke mode CLI.
Tekan tombol : untuk memasuki mode bottom line. Masukkan wq dan tekan tombol Enter untuk menyimpan file dan keluar.
Kirim pesan ke topik yang telah dibuat.
Jalankan
./filebeat -c ./output.yml.Masukkan test dan tekan tombol Enter.
Langkah 4: Melihat partisi topik
Anda dapat melihat status pesan yang dikirim ke topik.
Masuk ke Konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Pada halaman Instances, klik nama instance yang ingin Anda kelola.
Di panel navigasi sebelah kiri, klik Topics.
Pada halaman Topics, klik nama topik yang ingin Anda kelola. Pada halaman Topic Details, klik tab Partition Status.Name
Tabel 1. Parameter yang termasuk dalam status partisi
Parameter
Deskripsi
ID Partisi
ID partisi.
Offset Minimum
Offset minimum dalam partisi.
Offset Maksimum
Offset maksimum dalam partisi.
Pesan
Jumlah pesan dalam partisi.
Terakhir Diperbarui Pada
Waktu saat pesan terakhir dalam partisi disimpan.

Langkah 5: Menanyakan pesan berdasarkan offset
Anda dapat menanyakan pesan yang dikirim berdasarkan ID partisi dan offset-nya.
Masuk ke Konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Pada halaman Instances, klik nama instance yang ingin Anda kelola.
Di panel navigasi sebelah kiri, klik Message Query.
Pada halaman Message Query, pilih Search by offset dari daftar drop-down Search Method.
Pilih topik dari daftar drop-down Topic dan partisi dari daftar drop-down Partition, masukkan nilai offset di kolom Offset, lalu klik Search.
Pesan yang memiliki nilai offset lebih besar dari atau sama dengan nilai offset yang ditentukan akan ditampilkan. Misalnya, jika Anda menentukan 5 sebagai nilai parameter Partisi dan Offset, sistem akan menanyakan pesan yang offset-nya sama dengan atau lebih besar dari 5 dari Partisi 5.
Tabel 2. Parameter yang termasuk dalam hasil pencarian pesan
Parameter
Deskripsi
Partition
Partisi tempat pesan diambil.
Offset
Offset pesan.
Key
Kunci pesan. Kunci dikonversi menjadi string.
Value
Nilai pesan, juga dikenal sebagai isi pesan. Nilai pesan dikonversi menjadi string.
Created At
Titik waktu saat pesan dikirim. Nilainya adalah timestamp yang direkam klien saat pesan dikirim atau nilai dari field timestamp yang Anda tentukan untuk objek
ProducerRecord.CatatanJika Anda menentukan nilai untuk field timestamp, nilai yang ditentukan akan ditampilkan.
Jika Anda tidak menentukan nilai untuk field timestamp, waktu sistem saat pesan dikirim akan ditampilkan.
Nilai dalam format 1970/x/x x:x:x menunjukkan bahwa field timestamp ditentukan sebagai 0 atau nilai yang tidak valid.
Anda tidak dapat menentukan field timestamp pada klien ApsaraMQ for Kafka versi 0.9 atau lebih lama.
Actions
Klik Download Key untuk mengunduh kunci pesan.
Klik Download Value untuk mengunduh isi pesan.
PentingHingga 1 KB konten untuk setiap pesan yang diambil dapat ditampilkan di Konsol ApsaraMQ for Kafka. Jika pesan yang diambil melebihi 1 KB, sistem secara otomatis memotong kontennya. Jika Anda ingin melihat konten pesan lengkap, unduh pesan tersebut.
Anda dapat mengunduh hingga 10 MB pesan yang diambil sekaligus. Jika pesan yang diambil melebihi 10 MB, hanya 10 MB pertama dari konten pesan yang dapat diunduh.