All Products
Search
Document Center

ApsaraMQ for Kafka:Gunakan SDK untuk Ruby untuk mengirim dan berlangganan pesan

Last Updated:Jul 06, 2025

Tema ini menjelaskan cara menggunakan SDK untuk Ruby untuk terhubung ke titik akhir dari instance ApsaraMQ for Kafka serta mengirim dan berlangganan pesan.

Persyaratan lingkungan

Ruby telah diinstal. Untuk informasi lebih lanjut, lihat Ruby.

Instal pustaka Ruby

Jalankan perintah berikut untuk menginstal pustaka Ruby:

gem install ruby-kafka -v 0.6.8

Siapkan file konfigurasi

  1. Opsional:Unduh sertifikat root SSL. Jika Anda menggunakan titik akhir SSL untuk terhubung ke instance Message Queue for Apache Kafka, Anda harus mengunduh sertifikat ini.

  2. Buka halaman aliware-kafka-demos, klik download untuk mengunduh proyek demo ke mesin lokal Anda, lalu ekstrak paket proyek tersebut.

  3. Di dalam paket yang diekstrak, masuk ke folder kafka-ruby-demo. Kemudian, buka folder yang sesuai berdasarkan titik akhir yang ingin digunakan, dan konfigurasikan file producer.ruby dan consumer.ruby di dalam folder tersebut.

    Tabel 1. Parameter

    Parameter

    Deskripsi

    brokers

    Titik akhir SSL dari instance Message Queue for Apache Kafka. Anda dapat memperoleh titik akhir SSL di bagian Endpoint Information pada halaman Instance Details di Konsol ApsaraMQ for Kafka.

    topic

    Nama topik. Anda dapat memperoleh nama topik di halaman Topics di Konsol ApsaraMQ for Kafka.

    username

    Nama pengguna Simple Authentication and Security Layer (SASL). Jika Anda menggunakan titik akhir default untuk terhubung ke instance Message Queue for Apache Kafka, parameter ini tidak termasuk.

    Catatan
    • Jika fitur ACL tidak diaktifkan untuk instance ApsaraMQ for Kafka, Anda dapat memperoleh nama pengguna dan kata sandi pengguna SASL dari parameter Username dan Password di bagian Configuration Information pada halaman Instance Details di Konsol ApsaraMQ for Kafka.

    • Jika fitur ACL diaktifkan untuk instance ApsaraMQ for Kafka, pastikan bahwa pengguna SASL memiliki otorisasi untuk mengirim dan menerima pesan menggunakan instance tersebut. Untuk informasi lebih lanjut, lihat Berikan izin kepada pengguna SASL.

    password

    Kata sandi pengguna SASL. Jika Anda menggunakan titik akhir default untuk terhubung ke instance Message Queue for Apache Kafka, parameter ini tidak termasuk.

    consumerGroup

    ID grup konsumen. Anda dapat memperoleh ID grup konsumen di halaman Groups di Konsol ApsaraMQ for Kafka.

  4. Setelah parameter yang diperlukan dikonfigurasi, unggah semua file di folder tempat file konfigurasi berada ke direktori instalasi Ruby di server Anda. Folder yang sesuai dengan titik akhir SSL berisi file sertifikat root SSL.

Kirim pesan

Jalankan perintah berikut untuk menjalankan producer.ruby guna mengirim pesan:

ruby producer.ruby

Untuk informasi tentang parameter dalam kode, lihat Parameter.

Contoh kode berikut memberikan contoh producer.ruby:

Catatan

Dalam contoh kode, titik akhir SSL digunakan. Hapus atau modifikasi kode terkait parameter berdasarkan titik akhir yang Anda gunakan untuk terhubung ke instance Message Queue for Apache Kafka, serta modifikasi kode lainnya berdasarkan komentar yang diformat tebal.

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO

brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"

kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: "sasl-producer", # Jika Anda menggunakan titik akhir default, ubah nilainya menjadi simple-producer. 
    logger: logger,
    # letakkan "./cert.pem" di mana saja agar bisa dibaca
    # Jika Anda menggunakan titik akhir default, hapus tiga baris berikut: 
    ssl_ca_cert: File.read('./cert.pem'),    
    sasl_plain_username: username,
    sasl_plain_password: password,
    )

producer = kafka.producer

begin
    $stdin.each_with_index do |line, index|

    producer.produce(line, topic: topic)

    producer.deliver_messages
end

ensure

    producer.deliver_messages

    producer.shutdown
end

Berlangganan pesan

Jalankan perintah berikut untuk menjalankan consumer.ruby guna mengonsumsi pesan:

ruby consumer.ruby

Contoh kode berikut memberikan contoh consumer.ruby:

Untuk informasi tentang parameter dalam kode, lihat Parameter.

Catatan

Dalam contoh kode, titik akhir SSL digunakan. Hapus atau modifikasi kode terkait parameter berdasarkan titik akhir yang Anda gunakan untuk terhubung ke instance Message Queue for Apache Kafka, serta modifikasi kode lainnya berdasarkan komentar yang diformat tebal.

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO

brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
consumerGroup = "xxx"

kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "sasl-consumer", # Jika Anda menggunakan titik akhir default, ubah nilainya menjadi test.
        socket_timeout: 20,
        logger: logger,
        # letakkan "./cert.pem" di mana saja agar bisa dibaca
        # Jika Anda menggunakan titik akhir default, hapus tiga baris berikut: 
        ssl_ca_cert: File.read('./cert.pem'),
        sasl_plain_username: username,
        sasl_plain_password: password,
        )

consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)

trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }

begin
    consumer.each_message(max_bytes: 64 * 1024) do |message|
    logger.info("Dapatkan pesan: #{message.value}")
    end
rescue Kafka::ProcessingError => e
    warn "Terjadi kesalahan: #{e.cause}"
    consumer.pause(e.topic, e.partition, timeout: 20)

    retry
end