All Products
Search
Document Center

ApsaraMQ for Kafka:Use the SDK for Ruby to send and subscribe to messages

Last Updated:Aug 18, 2023

This topic describes how to use the SDK for Ruby to connect to an endpoint of a ApsaraMQ for Kafka instance and send and subscribe to messages.

Environment requirements

Ruby is installed. For more information, see Download Ruby.

Install the Ruby dependency library

Run the following command to install the Ruby dependency library:
gem install ruby-kafka -v 0.6.8

Prepare configuration files

  1. Optional:Download the SSL root certificate. If you use the SSL endpoint to connect to your Message Queue for Apache Kafka instance, you must download this certificate.

  2. Go to the Aliware-kafka-demos page, click the download icon to download the demo project to your on-premises machine, and then decompress the demo project.

  3. In the decompressed package, go to the kafka-ruby-demo folder. Then, open the corresponding folder based on the endpoint that you want to use, and configure the producer.ruby file and the consumer.ruby file in the folder.

    Table 1. Parameters

    Parameter

    Description

    brokers

    The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    topic

    The name of the topic. You can obtain the name of the topic on the Topics page in the ApsaraMQ for Kafka console.

    username

    The username of the Simple Authentication and Security Layer (SASL) user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.

    Note
    • If the ACL feature is not enabled for your ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
    • If the ACL feature is enabled for your ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.

    password

    The password of the SASL user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.

    consumerGroup

    The ID of the consumer group. You can obtain the ID of the consumer group on the Groups page in the ApsaraMQ for Kafka console.

  4. After the required parameters are configured, upload all files in the folder in which the configuration file is located to the Ruby installation directory on your server. The folder that corresponds to the SSL endpoint contains the SSL root certificate file.

Send messages

Run the following command to run producer.ruby to send messages:

ruby producer.ruby

For information about the parameters in the code, see Parameters.

The following sample code provides an example of producer.ruby:

Note

In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the Message Queue for Apache Kafka instance, and modify the other code based on the comments that are formatted in bold.

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO

brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"

kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: "sasl-producer", # If you use the default endpoint, change the value to simple-producer. 
    logger: logger,
    # put "./cert.pem" to anywhere this can read
    # If you use the default endpoint, delete the following three lines: 
    ssl_ca_cert: File.read('./cert.pem'),    
    sasl_plain_username: username,
    sasl_plain_password: password,
    )

producer = kafka.producer

begin
    $stdin.each_with_index do |line, index|

    producer.produce(line, topic: topic)

    producer.deliver_messages
end

ensure

    producer.deliver_messages

    producer.shutdown
end

Subscribe to messages

Run the following command to run consumer.ruby to consume messages:

ruby consumer.ruby

The following sample code provides an example of consumer.ruby:

For information about the parameters in the code, see Parameters.

Note

In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the Message Queue for Apache Kafka instance, and modify the other code based on the comments that are formatted in bold.

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO

brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
consumerGroup = "xxx"

kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "sasl-consumer", # If you use the default endpoint, change the value to test.
        socket_timeout: 20,
        logger: logger,
        # put "./cert.pem" to anywhere this can read
        # If you use the default endpoint, delete the following three lines: 
        ssl_ca_cert: File.read('./cert.pem'),
        sasl_plain_username: username,
        sasl_plain_password: password,
        )

consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)

trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }

begin
    consumer.each_message(max_bytes: 64 * 1024) do |message|
    logger.info("Get message: #{message.value}")
    end
rescue Kafka::ProcessingError => e
    warn "Got error: #{e.cause}"
    consumer.pause(e.topic, e.partition, timeout: 20)

    retry
end