Tema ini menjelaskan cara menggunakan SDK untuk Ruby untuk terhubung ke titik akhir dari instance ApsaraMQ for Kafka serta mengirim dan berlangganan pesan.
Persyaratan lingkungan
Ruby telah diinstal. Untuk informasi lebih lanjut, lihat Ruby.
Instal pustaka Ruby
Jalankan perintah berikut untuk menginstal pustaka Ruby:
gem install ruby-kafka -v 0.6.8Siapkan file konfigurasi
Opsional:Unduh sertifikat root SSL. Jika Anda menggunakan titik akhir SSL untuk terhubung ke instance Message Queue for Apache Kafka, Anda harus mengunduh sertifikat ini.
Buka halaman aliware-kafka-demos, klik
untuk mengunduh proyek demo ke mesin lokal Anda, lalu ekstrak paket proyek tersebut.Di dalam paket yang diekstrak, masuk ke folder kafka-ruby-demo. Kemudian, buka folder yang sesuai berdasarkan titik akhir yang ingin digunakan, dan konfigurasikan file producer.ruby dan consumer.ruby di dalam folder tersebut.
Tabel 1. Parameter Parameter
Deskripsi
brokers
Titik akhir SSL dari instance Message Queue for Apache Kafka. Anda dapat memperoleh titik akhir SSL di bagian Endpoint Information pada halaman Instance Details di Konsol ApsaraMQ for Kafka.
topic
Nama topik. Anda dapat memperoleh nama topik di halaman Topics di Konsol ApsaraMQ for Kafka.
username
Nama pengguna Simple Authentication and Security Layer (SASL). Jika Anda menggunakan titik akhir default untuk terhubung ke instance Message Queue for Apache Kafka, parameter ini tidak termasuk.
CatatanJika fitur ACL tidak diaktifkan untuk instance ApsaraMQ for Kafka, Anda dapat memperoleh nama pengguna dan kata sandi pengguna SASL dari parameter Username dan Password di bagian Configuration Information pada halaman Instance Details di Konsol ApsaraMQ for Kafka.
Jika fitur ACL diaktifkan untuk instance ApsaraMQ for Kafka, pastikan bahwa pengguna SASL memiliki otorisasi untuk mengirim dan menerima pesan menggunakan instance tersebut. Untuk informasi lebih lanjut, lihat Berikan izin kepada pengguna SASL.
password
Kata sandi pengguna SASL. Jika Anda menggunakan titik akhir default untuk terhubung ke instance Message Queue for Apache Kafka, parameter ini tidak termasuk.
consumerGroup
ID grup konsumen. Anda dapat memperoleh ID grup konsumen di halaman Groups di Konsol ApsaraMQ for Kafka.
Setelah parameter yang diperlukan dikonfigurasi, unggah semua file di folder tempat file konfigurasi berada ke direktori instalasi Ruby di server Anda. Folder yang sesuai dengan titik akhir SSL berisi file sertifikat root SSL.
Kirim pesan
Jalankan perintah berikut untuk menjalankan producer.ruby guna mengirim pesan:
ruby producer.rubyUntuk informasi tentang parameter dalam kode, lihat Parameter.
Contoh kode berikut memberikan contoh producer.ruby:
Dalam contoh kode, titik akhir SSL digunakan. Hapus atau modifikasi kode terkait parameter berdasarkan titik akhir yang Anda gunakan untuk terhubung ke instance Message Queue for Apache Kafka, serta modifikasi kode lainnya berdasarkan komentar yang diformat tebal.
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-producer", # Jika Anda menggunakan titik akhir default, ubah nilainya menjadi simple-producer.
logger: logger,
# letakkan "./cert.pem" di mana saja agar bisa dibaca
# Jika Anda menggunakan titik akhir default, hapus tiga baris berikut:
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
producer = kafka.producer
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
producer.deliver_messages
end
ensure
producer.deliver_messages
producer.shutdown
endBerlangganan pesan
Jalankan perintah berikut untuk menjalankan consumer.ruby guna mengonsumsi pesan:
ruby consumer.rubyContoh kode berikut memberikan contoh consumer.ruby:
Untuk informasi tentang parameter dalam kode, lihat Parameter.
Dalam contoh kode, titik akhir SSL digunakan. Hapus atau modifikasi kode terkait parameter berdasarkan titik akhir yang Anda gunakan untuk terhubung ke instance Message Queue for Apache Kafka, serta modifikasi kode lainnya berdasarkan komentar yang diformat tebal.
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
consumerGroup = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-consumer", # Jika Anda menggunakan titik akhir default, ubah nilainya menjadi test.
socket_timeout: 20,
logger: logger,
# letakkan "./cert.pem" di mana saja agar bisa dibaca
# Jika Anda menggunakan titik akhir default, hapus tiga baris berikut:
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
begin
consumer.each_message(max_bytes: 64 * 1024) do |message|
logger.info("Dapatkan pesan: #{message.value}")
end
rescue Kafka::ProcessingError => e
warn "Terjadi kesalahan: #{e.cause}"
consumer.pause(e.topic, e.partition, timeout: 20)
retry
end